Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/babelqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@

from __future__ import annotations

from . import dead_letter
from . import dead_letter, idempotency
from .app import BabelQueue
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
from .contracts import HasTraceId, PolyglotMessage
from .idempotency import IdempotencyStore, InMemoryStore
from .exceptions import BabelQueueError, UnknownUrnError
from .routing import UnknownUrnStrategy
from .transport import InMemoryTransport, ReceivedMessage, Transport
Expand All @@ -35,5 +36,8 @@
"BabelQueueError",
"UnknownUrnError",
"dead_letter",
"idempotency",
"IdempotencyStore",
"InMemoryStore",
"__version__",
]
14 changes: 14 additions & 0 deletions src/babelqueue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,17 @@ class BabelQueueError(Exception):

class UnknownUrnError(BabelQueueError):
"""A consumed message carries a URN with no mapped handler (strategy "fail")."""


class InvalidPayloadError(BabelQueueError):
"""A message's ``data`` does not match the JSON Schema registered for its URN (ADR-0024).

Raised by the producer-side :func:`babelqueue.schema.validate` and the consumer-side
:func:`babelqueue.schema.wrap`; the latter lets the runtime redeliver (and eventually
dead-letter) a poison message.
"""

def __init__(self, urn: str, violation: str) -> None:
super().__init__(f"data for {urn!r} does not match its URN schema: {violation}")
self.urn = urn
self.violation = violation
95 changes: 95 additions & 0 deletions src/babelqueue/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Optional idempotency helper (ADR-0022): dedupe a consume handler on ``meta.id``.

The Python mirror of the PHP ``BabelQueue\\Idempotency`` and Go ``idempotency``
helpers. It wraps a handler so a message whose ``meta.id`` was already processed
successfully is skipped instead of run again, composing with the runtime's
ack-on-return / redeliver-on-raise contract::

from babelqueue import BabelQueue
from babelqueue.idempotency import InMemoryStore, wrap

app = BabelQueue("redis://localhost:6379/0", queue="orders")
store = InMemoryStore()
app.register("urn:babel:orders:created", wrap(store, on_order_created))

A previously-seen id returns early (the runtime acks it, so the broker stops
redelivering); a raising handler leaves the id unmarked so a redelivery runs it again
(retry / dead-letter still apply); a message with no usable ``meta.id`` runs unchanged.
This is "seen-set" post-success dedupe — not exactly-once and not in-flight concurrency
locking; a transactional / outbox mode is a documented future direction.
"""

from __future__ import annotations

import functools
import threading
from typing import Any, Callable, Mapping, Protocol, runtime_checkable

Handler = Callable[..., None]


@runtime_checkable
class IdempotencyStore(Protocol):
"""A record of message ids already processed, keyed on ``meta.id``."""

def seen(self, message_id: str) -> bool:
"""Whether this id has already been processed (remembered)."""

def remember(self, message_id: str) -> None:
"""Record this id as processed."""

def forget(self, message_id: str) -> None:
"""Drop an id from the store (manual eviction; a backend may also expire ids)."""


class InMemoryStore:
"""Process-local, thread-safe :class:`IdempotencyStore`.

For tests and single-process consumers; it is not shared across workers and not
persistent — use a Redis- or database-backed store for production fleets.
"""

def __init__(self) -> None:
self._seen: set[str] = set()
self._lock = threading.Lock()

def seen(self, message_id: str) -> bool:
with self._lock:
return message_id in self._seen

def remember(self, message_id: str) -> None:
with self._lock:
self._seen.add(message_id)

def forget(self, message_id: str) -> None:
with self._lock:
self._seen.discard(message_id)


def wrap(store: IdempotencyStore, handler: Handler) -> Handler:
"""Wrap ``handler`` so a message whose ``meta.id`` was already processed is skipped.

The returned callable keeps ``handler``'s signature (via :func:`functools.wraps`),
so the runtime's introspection still passes it the right number of positional args
(``data, meta`` or ``data, meta, envelope``).
"""

@functools.wraps(handler)
def wrapped(*args: Any) -> None:
meta = args[1] if len(args) > 1 and isinstance(args[1], Mapping) else {}
message_id = meta.get("id")

# No usable id → cannot dedupe; run the handler unchanged.
if not isinstance(message_id, str) or message_id == "":
handler(*args)
return

# Already processed on an earlier delivery: return so the runtime acks it.
if store.seen(message_id):
return

# First success wins; a raise here leaves the id unmarked → retry/DLQ apply.
handler(*args)
store.remember(message_id)

return wrapped
261 changes: 261 additions & 0 deletions src/babelqueue/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
"""Optional per-URN payload schema validation (ADR-0024).

The Python mirror of the Go ``schema`` package and PHP ``BabelQueue\\Schema``. A
:class:`SchemaProvider` supplies a JSON Schema for a message URN — typically read from a
babelqueue-registry ``registry.json`` — and the message's ``data`` is validated against it.
It is opt-in: a URN with no registered schema is never validated.

- **Producer-side (recommended):** call :func:`validate` before publishing so invalid data
never enters the queue, or :func:`check` to branch without raising::

from babelqueue.schema import MapProvider, validate

provider = MapProvider.from_json({"urn:babel:orders:created": ORDERS_SCHEMA_JSON})
validate(provider, "urn:babel:orders:created", {"order_id": 7}) # raises on mismatch

- **Consumer-side (safety net):** wrap a handler with :func:`wrap`. Because a Python handler
receives ``data`` (and ``meta``, ``envelope``) positionally rather than a message object,
the URN is passed explicitly — usually the same URN you register under::

app.register(URN, wrap(provider, URN, on_order_created))

Invalid data raises :class:`~babelqueue.exceptions.InvalidPayloadError`, so the runtime
redelivers (and eventually dead-letters) the poison message; a URN with no schema runs the
handler unchanged.

The validator is a small subset of JSON Schema (draft-07) whose verdicts match the Go and
PHP validators and babelqueue-registry's ``compat`` linter: ``type``, ``required``,
``properties``, ``additionalProperties``, ``items``, ``enum``, ``const``, ``minLength``,
``minimum``. Unknown keywords are ignored. Zero dependencies (stdlib only).
"""

from __future__ import annotations

import functools
import json
import os
import threading
from typing import Any, Callable, Mapping, Optional, Protocol, runtime_checkable

from .exceptions import InvalidPayloadError

Handler = Callable[..., None]


@runtime_checkable
class SchemaProvider(Protocol):
"""A source of per-URN ``data`` schemas, keyed on the message URN."""

def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
"""The JSON Schema registered for ``urn``, or None when none is registered."""


class MapProvider:
"""In-memory :class:`SchemaProvider`, for tests and for embedding schemas in code."""

def __init__(self, schemas: Mapping[str, Mapping[str, Any]]) -> None:
self._schemas: dict[str, Mapping[str, Any]] = dict(schemas)

@classmethod
def from_json(cls, raw: Mapping[str, str]) -> "MapProvider":
"""Build a provider from URN -> raw JSON Schema strings, decoding each."""
schemas: dict[str, Mapping[str, Any]] = {}
for urn, body in raw.items():
decoded = json.loads(body)
if not isinstance(decoded, dict):
raise ValueError(f"schema: invalid JSON schema for {urn!r}")
schemas[urn] = decoded
return cls(schemas)

def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
return self._schemas.get(urn)


class DirProvider:
"""Reads schemas from a babelqueue-registry manifest (``registry.json``): a list of
``{urn, schema}`` entries mapping each URN to a schema file for its ``data`` block. The
bridge that makes the registry's governed schemas enforceable at runtime. Schema files
are read and decoded lazily and cached (thread-safe). A URN absent from the manifest
returns None (skip); a URN whose schema file is missing raises (config/IO error)."""

def __init__(self, manifest_path: str) -> None:
with open(manifest_path, "r", encoding="utf-8") as fh:
manifest = json.load(fh)
if not isinstance(manifest, dict):
raise ValueError(f"schema: invalid registry manifest {manifest_path!r}")

self._dir = os.path.dirname(manifest_path)
self._files: dict[str, str] = {}
self._cache: dict[str, Mapping[str, Any]] = {}
self._lock = threading.Lock()

entries = manifest.get("schemas") or []
if isinstance(entries, list):
for entry in entries:
if not isinstance(entry, Mapping):
continue
urn = entry.get("urn")
file = entry.get("schema")
if isinstance(urn, str) and urn and isinstance(file, str) and file:
self._files[urn] = file

def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
with self._lock:
cached = self._cache.get(urn)
if cached is not None:
return cached
file = self._files.get(urn)
if file is None:
return None
path = file if os.path.isabs(file) else os.path.join(self._dir, file)
with open(path, "r", encoding="utf-8") as fh:
decoded = json.load(fh)
if not isinstance(decoded, dict):
raise ValueError(f"schema: invalid schema for {urn!r} ({file})")
self._cache[urn] = decoded
return decoded


def check(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> Optional[str]:
"""The first ``data`` violation for ``(urn, data)``, or None when it is valid or when no
schema is registered for the URN (opt-in). Non-raising; for producer-side branching."""
schema = provider.schema_for(urn)
if schema is None:
return None
return validate_schema(schema, dict(data))


def validate(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> None:
"""Validate ``(urn, data)`` against its registered schema, raising otherwise. The
producer-side guard; call it before publishing.

:raises InvalidPayloadError: when the data does not match the URN's schema.
"""
violation = check(provider, urn, data)
if violation is not None:
raise InvalidPayloadError(urn, violation)


def wrap(provider: SchemaProvider, urn: str, handler: Handler) -> Handler:
"""Wrap a consume handler so each message's ``data`` is validated against ``urn``'s
schema before the handler runs. The returned callable keeps ``handler``'s signature (via
:func:`functools.wraps`), so the runtime still passes it the right number of positional
args (``data, meta`` or ``data, meta, envelope``)."""

@functools.wraps(handler)
def wrapped(*args: Any) -> None:
data = args[0] if args and isinstance(args[0], Mapping) else {}
validate(provider, urn, data)
handler(*args)

return wrapped


def validate_schema(schema: Mapping[str, Any], value: Any, path: str = "") -> Optional[str]:
"""The first violation of ``value`` against a (subset) JSON Schema node, or None."""
if "const" in schema and not _equal(value, schema["const"]):
return _violation(path, "wrong_const")

enum = schema.get("enum")
if isinstance(enum, list) and not any(_equal(value, item) for item in enum):
return _violation(path, "not_in_enum")

typ = schema.get("type")
if typ == "object":
return _check_object(schema, value, path)
if typ == "array":
return _check_array(schema, value, path)
if typ == "string":
if not isinstance(value, str):
return _violation(path, "not_a_string")
min_len = schema.get("minLength")
if isinstance(min_len, (int, float)) and len(value) < int(min_len):
return _violation(path, "below_min_length")
return None
if typ == "integer":
if not _is_integer(value):
return _violation(path, "not_an_integer")
return _check_minimum(schema, value, path)
if typ == "number":
if not _is_number(value):
return _violation(path, "not_a_number")
return _check_minimum(schema, value, path)
if typ == "boolean":
return None if isinstance(value, bool) else _violation(path, "not_a_boolean")
if typ == "null":
return None if value is None else _violation(path, "not_null")
return None


def _check_object(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
if not isinstance(value, Mapping):
return _violation(path, "not_an_object")

required = schema.get("required")
if isinstance(required, list):
for key in required:
if isinstance(key, str) and key not in value:
return _violation(_join(path, key), "missing_required")

properties = schema.get("properties")
properties = properties if isinstance(properties, Mapping) else {}
additional_allowed = schema.get("additionalProperties") is not False

for key, item in value.items():
name = str(key)
prop = properties.get(name)
if isinstance(prop, Mapping):
violation = validate_schema(prop, item, _join(path, name))
if violation is not None:
return violation
continue
if not additional_allowed:
return _violation(_join(path, name), "additional_not_allowed")

return None


def _check_array(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
if not isinstance(value, list):
return _violation(path, "not_an_array")
items = schema.get("items")
if not isinstance(items, Mapping):
return None
for i, item in enumerate(value):
violation = validate_schema(items, item, f"{path}[{i}]")
if violation is not None:
return violation
return None


def _check_minimum(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
minimum = schema.get("minimum")
if isinstance(minimum, (int, float)) and not isinstance(minimum, bool) and float(value) < float(minimum):
return _violation(path, "below_minimum")
return None


def _is_integer(value: Any) -> bool:
if isinstance(value, bool):
return False
if isinstance(value, int):
return True
return isinstance(value, float) and value.is_integer()


def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)


def _equal(a: Any, b: Any) -> bool:
# Type-aware equality so True != 1 and an integer const never matches a float value,
# matching the strict comparisons in the Go and PHP validators.
return type(a) is type(b) and a == b


def _violation(path: str, reason: str) -> str:
return f"{path or '<root>'}: {reason}"


def _join(path: str, key: str) -> str:
return key if path == "" else f"{path}.{key}"
Loading