Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/babelqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from __future__ import annotations

from . import dead_letter, idempotency
from . import dead_letter, idempotency, redrive
from .app import BabelQueue
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
from .contracts import HasTraceId, PolyglotMessage
Expand All @@ -37,6 +37,7 @@
"UnknownUrnError",
"dead_letter",
"idempotency",
"redrive",
"IdempotencyStore",
"InMemoryStore",
"__version__",
Expand Down
165 changes: 165 additions & 0 deletions src/babelqueue/redrive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Optional DLQ redrive tooling (ADR-0026): safe replay off the dead-letter queue.

The Python mirror of the Go ``Redrive``. It reads dead-lettered messages off a DLQ and
re-publishes each to its source queue (its ``dead_letter.original_queue``) or a chosen
``to_queue``, **reset for reprocessing**: the ``dead_letter`` block is removed and ``attempts``
reset to 0, while ``job``, ``trace_id``, ``data`` and ``meta`` are preserved verbatim. It is
the operator-side counterpart to the runtime's dead-letter routing — the contract leaves
redrive to tooling, and this is that tool.

from babelqueue import BabelQueue
from babelqueue.redrive import redrive

app = BabelQueue("redis://localhost:6379/0")
result = redrive(app.transport, "orders.dlq") # back to each source
result = redrive(app.transport, "orders.dlq", to_queue="sandbox") # safe sandbox replay
plan = redrive(app.transport, "orders.dlq", dry_run=True) # inspect, change nothing

Messages are drained from the DLQ first and then processed, so restored messages (skipped,
dry-run, or undecodable) are never re-encountered in the same run; a DLQ message is
acknowledged only after a successful re-publish, and an undecodable body is restored, not
dropped.

Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass**
guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip
external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the
OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header
and so touches the runtime + every transport binding. Until then, sandbox routing is the
safe-replay answer.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple

from .codec import EnvelopeCodec
from .transport import ReceivedMessage, Transport

Envelope = Mapping[str, Any]
Select = Callable[[Envelope], bool]


@dataclass
class RedriveItem:
"""What happened to one message during a redrive run."""

message_id: str
trace_id: str
urn: str
reason: str
from_queue: str
to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable)
redriven: bool # True only when actually re-published to ``to``


@dataclass
class RedriveResult:
"""Summary of a redrive run."""

redriven: int = 0
skipped: int = 0
items: List[RedriveItem] = field(default_factory=list)


def redrive(
transport: Transport,
dlq: str,
*,
to_queue: Optional[str] = None,
max: int = 0,
dry_run: bool = False,
select: Optional[Select] = None,
timeout: float = 1.0,
) -> RedriveResult:
"""Move dead-lettered messages off ``dlq`` and replay them; see the module docstring."""
# Drain up to ``max`` messages (or all available) before processing any of them.
batch: List[Tuple[ReceivedMessage, Optional[Dict[str, Any]]]] = []
while max == 0 or len(batch) < max:
message = transport.pop(dlq, timeout)
if message is None:
break
batch.append((message, _decoded(message.body)))

result = RedriveResult()
for message, envelope in batch:
if envelope is None:
transport.publish(dlq, message.body) # restore the poison body; never drop it
transport.ack(message)
result.skipped += 1
result.items.append(RedriveItem("", "", "", "", dlq, "", False))
continue

meta_raw = envelope.get("meta")
meta: Mapping[str, Any] = meta_raw if isinstance(meta_raw, Mapping) else {}
dl_raw = envelope.get("dead_letter")
dead_letter: Mapping[str, Any] = dl_raw if isinstance(dl_raw, Mapping) else {}
item = RedriveItem(
message_id=str(meta.get("id", "")),
trace_id=str(envelope.get("trace_id", "")),
urn=EnvelopeCodec.urn(envelope),
reason=str(dead_letter.get("reason", "")),
from_queue=dlq,
to="",
redriven=False,
)

if select is not None and not select(envelope):
transport.publish(dlq, message.body) # not selected: restore unchanged
transport.ack(message)
result.skipped += 1
result.items.append(item)
continue

target = to_queue or _source_queue_of(envelope)
item.to = target

if dry_run:
transport.publish(dlq, message.body) # report the plan; restore unchanged
transport.ack(message)
result.skipped += 1
result.items.append(item)
continue

reset = dict(envelope)
reset.pop("dead_letter", None)
reset["attempts"] = 0
try:
transport.publish(target, EnvelopeCodec.encode(reset))
except Exception:
transport.publish(dlq, message.body) # restore on a publish failure, then surface
transport.ack(message)
raise
transport.ack(message)
item.redriven = True
result.redriven += 1
result.items.append(item)

return result


def _decoded(body: str) -> Optional[Dict[str, Any]]:
"""Decode a DLQ body, or None when it is not a redrivable envelope.

``EnvelopeCodec.decode`` returns ``{}`` for malformed/non-object input; an object with no
string ``job`` is likewise not redrivable.
"""
envelope = EnvelopeCodec.decode(body)
if not envelope or not isinstance(envelope.get("job"), str):
return None
return envelope


def _source_queue_of(envelope: Envelope) -> str:
"""Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``."""
dead_letter = envelope.get("dead_letter")
if isinstance(dead_letter, Mapping):
original = dead_letter.get("original_queue")
if isinstance(original, str) and original:
return original
meta = envelope.get("meta")
if isinstance(meta, Mapping):
queue = meta.get("queue")
if isinstance(queue, str):
return queue
return ""
139 changes: 139 additions & 0 deletions tests/test_redrive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Tests for the optional DLQ redrive tooling (ADR-0026)."""

import pytest

from babelqueue import dead_letter
from babelqueue.codec import EnvelopeCodec
from babelqueue.redrive import redrive
from babelqueue.transport import InMemoryTransport


def _dead_letter(transport, dlq, urn, original_queue, data=None):
env = EnvelopeCodec.make(urn, data or {}, queue=original_queue)
annotated = dead_letter.annotate(env, "failed", original_queue, 3, error="boom")
transport.publish(dlq, EnvelopeCodec.encode(annotated))
return annotated


def _drain(transport, queue):
out = []
while True:
message = transport.pop(queue, 0)
if message is None:
break
out.append(EnvelopeCodec.decode(message.body))
transport.ack(message)
return out


def test_redrive_to_source():
t = InMemoryTransport()
orig = _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders", {"order_id": 1})

result = redrive(t, "orders.dlq")

assert result.redriven == 1 and result.skipped == 0
got = _drain(t, "orders")
assert len(got) == 1
assert "dead_letter" not in got[0]
assert got[0]["attempts"] == 0
assert got[0]["trace_id"] == orig["trace_id"]
assert got[0]["data"] == {"order_id": 1}
assert EnvelopeCodec.urn(got[0]) == "urn:babel:orders:created"
assert _drain(t, "orders.dlq") == []


def test_redrive_to_sandbox():
t = InMemoryTransport()
_dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders")

result = redrive(t, "orders.dlq", to_queue="sandbox")

assert result.redriven == 1
assert _drain(t, "orders") == []
assert len(_drain(t, "sandbox")) == 1


def test_redrive_dry_run():
t = InMemoryTransport()
_dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders")

result = redrive(t, "orders.dlq", dry_run=True)

assert result.redriven == 0 and result.skipped == 1
assert result.items[0].to == "orders"
assert result.items[0].redriven is False
assert _drain(t, "orders") == []
dlq = _drain(t, "orders.dlq")
assert len(dlq) == 1 and "dead_letter" in dlq[0]


def test_redrive_select():
t = InMemoryTransport()
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")
_dead_letter(t, "dlq", "urn:babel:emails:welcome", "emails")

result = redrive(t, "dlq", select=lambda e: EnvelopeCodec.urn(e) == "urn:babel:orders:created")

assert result.redriven == 1 and result.skipped == 1
assert len(_drain(t, "orders")) == 1
assert _drain(t, "emails") == []
assert len(_drain(t, "dlq")) == 1 # the unselected one is restored


def test_redrive_max():
t = InMemoryTransport()
for _ in range(3):
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")

result = redrive(t, "dlq", max=2)

assert result.redriven == 2
assert len(_drain(t, "dlq")) == 1 # Max respected


def test_redrive_no_dead_letter_falls_back_to_meta_queue():
t = InMemoryTransport()
# a plain (never dead-lettered) envelope on the DLQ — redrive falls back to meta.queue
env = EnvelopeCodec.make("urn:babel:orders:created", {}, queue="orders")
t.publish("dlq", EnvelopeCodec.encode(env))

result = redrive(t, "dlq")

assert result.redriven == 1
assert len(_drain(t, "orders")) == 1


class _FailOnTarget(InMemoryTransport):
"""An in-memory transport that refuses to publish to one queue."""

def __init__(self, fail_queue):
super().__init__()
self._fail_queue = fail_queue

def publish(self, queue, body):
if queue == self._fail_queue:
raise RuntimeError("publish refused")
super().publish(queue, body)


def test_redrive_publish_failure_restores():
t = _FailOnTarget("orders")
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")

with pytest.raises(RuntimeError):
redrive(t, "dlq")

assert len(_drain(t, "dlq")) == 1 # restored to the DLQ, not lost
assert _drain(t, "orders") == [] # nothing reached the source queue


def test_redrive_undecodable_restored():
t = InMemoryTransport()
t.publish("dlq", "not-json{{{")

result = redrive(t, "dlq")

assert result.redriven == 0 and result.skipped == 1
message = t.pop("dlq", 0)
assert message is not None and message.body == "not-json{{{"
Loading