From 4cf8cb4589368bd05ee8af3e92fff98b85106700 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 08:31:23 +0300 Subject: [PATCH] =?UTF-8?q?feat(redrive):=20DLQ=20redrive=20tooling=20?= =?UTF-8?q?=E2=80=94=20safe=20replay=20(ADR-0026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BabelQueue\Redrive: the PHP mirror of the Go redrive reference, adapted to PHP's publish-only Transport. Redrive::reset() (pure: strip dead_letter, attempts->0, preserve job/trace_id/data/meta) + Redrive::run(RedriveIO, dlq, opts) over a small reserve/ack/publish seam — re-publishes each dead-lettered message to its dead_letter.original_queue or a sandbox toQueue. Options: toQueue, max, dryRun, select. Drains-then-processes; acks only after a successful re-publish; restores undecodable bodies and on publish failure. No new dependency; envelope frozen (GR-1). Replay-Bypass header documented as phase two. --- src/Redrive/Redrive.php | 181 ++++++++++++++++++++++++++++++++ src/Redrive/RedriveIO.php | 32 ++++++ src/Redrive/RedriveItem.php | 22 ++++ src/Redrive/RedriveOptions.php | 34 ++++++ src/Redrive/RedriveResult.php | 21 ++++ tests/Redrive/RedriveTest.php | 184 +++++++++++++++++++++++++++++++++ 6 files changed, 474 insertions(+) create mode 100644 src/Redrive/Redrive.php create mode 100644 src/Redrive/RedriveIO.php create mode 100644 src/Redrive/RedriveItem.php create mode 100644 src/Redrive/RedriveOptions.php create mode 100644 src/Redrive/RedriveResult.php create mode 100644 tests/Redrive/RedriveTest.php diff --git a/src/Redrive/Redrive.php b/src/Redrive/Redrive.php new file mode 100644 index 0000000..1bed1a1 --- /dev/null +++ b/src/Redrive/Redrive.php @@ -0,0 +1,181 @@ + $envelope + * @return array + */ + public static function reset(array $envelope): array + { + unset($envelope['dead_letter']); + $envelope['attempts'] = 0; + + return $envelope; + } + + /** + * Drain the dlq queue and re-publish selected messages, reset, to their source (or + * $options->toQueue). Messages are drained first and then processed, so restored messages + * (skipped, dry-run, undecodable) are never re-encountered in the same run. A DLQ message is + * acknowledged only after its re-publish succeeds; an undecodable body is restored, not + * dropped. On a publish failure the message is restored to the DLQ and the error re-thrown. + */ + public static function run(RedriveIO $io, string $dlq, ?RedriveOptions $options = null): RedriveResult + { + $options ??= new RedriveOptions(); + + /** @var list, decoded: bool}> $batch */ + $batch = []; + while ($options->max === 0 || count($batch) < $options->max) { + $reserved = $io->pop($dlq); + if ($reserved === null) { + break; + } + $env = EnvelopeCodec::decode($reserved['body']); + $batch[] = [ + 'body' => $reserved['body'], + 'handle' => $reserved['handle'], + 'env' => $env, + 'decoded' => $env !== [], + ]; + } + + $redriven = 0; + $skipped = 0; + /** @var list $items */ + $items = []; + + foreach ($batch as $p) { + if (! $p['decoded']) { + $io->publish($dlq, $p['body']); // restore the poison body; never drop it + $io->ack($p['handle']); + $skipped++; + $items[] = new RedriveItem('', '', '', '', $dlq, '', false); + + continue; + } + + $env = $p['env']; + $messageId = self::metaString($env, 'id'); + $traceId = self::stringField($env, 'trace_id'); + $urn = EnvelopeCodec::urn($env); + $reason = self::deadLetterString($env, 'reason'); + + $select = $options->select; + if ($select !== null && ! $select($env)) { + $io->publish($dlq, $p['body']); // not selected: restore unchanged + $io->ack($p['handle']); + $skipped++; + $items[] = new RedriveItem($messageId, $traceId, $urn, $reason, $dlq, '', false); + + continue; + } + + $target = $options->toQueue ?? self::sourceQueueOf($env); + + if ($options->dryRun) { + $io->publish($dlq, $p['body']); // report the plan; restore unchanged + $io->ack($p['handle']); + $skipped++; + $items[] = new RedriveItem($messageId, $traceId, $urn, $reason, $dlq, $target, false); + + continue; + } + + $body = EnvelopeCodec::encode(self::reset($env)); + try { + $io->publish($target, $body); + } catch (Throwable $e) { + $io->publish($dlq, $p['body']); // restore on a publish failure, then surface it + $io->ack($p['handle']); + + throw $e; + } + $io->ack($p['handle']); + $redriven++; + $items[] = new RedriveItem($messageId, $traceId, $urn, $reason, $dlq, $target, true); + } + + return new RedriveResult($redriven, $skipped, $items); + } + + /** + * Where a redriven message goes by default: its dead_letter.original_queue, else meta.queue. + * + * @param array $env + */ + private static function sourceQueueOf(array $env): string + { + $original = self::deadLetterString($env, 'original_queue'); + + return $original !== '' ? $original : self::metaString($env, 'queue'); + } + + /** + * @param array $env + */ + private static function stringField(array $env, string $key): string + { + return isset($env[$key]) && is_string($env[$key]) ? $env[$key] : ''; + } + + /** + * @param array $env + */ + private static function metaString(array $env, string $key): string + { + $meta = $env['meta'] ?? null; + if (is_array($meta) && isset($meta[$key]) && is_string($meta[$key])) { + return $meta[$key]; + } + + return ''; + } + + /** + * @param array $env + */ + private static function deadLetterString(array $env, string $key): string + { + $dead = $env['dead_letter'] ?? null; + if (is_array($dead) && isset($dead[$key]) && is_string($dead[$key])) { + return $dead[$key]; + } + + return ''; + } +} diff --git a/src/Redrive/RedriveIO.php b/src/Redrive/RedriveIO.php new file mode 100644 index 0000000..5b374a6 --- /dev/null +++ b/src/Redrive/RedriveIO.php @@ -0,0 +1,32 @@ +): bool)|null */ + public readonly ?Closure $select; + + /** + * @param string|null $toQueue Override the target: when null, each message goes back to + * its own dead_letter.original_queue; set a sandbox queue to + * replay safely. + * @param int $max Cap how many messages are pulled from the DLQ (0 = all available). + * @param bool $dryRun Inspect and report the plan, restoring every message unchanged. + * @param (callable(array): bool)|null $select Pick which messages to + * redrive (e.g. by reason or URN); unselected are restored. + */ + public function __construct( + public readonly ?string $toQueue = null, + public readonly int $max = 0, + public readonly bool $dryRun = false, + ?callable $select = null, + ) { + $this->select = $select === null ? null : Closure::fromCallable($select); + } +} diff --git a/src/Redrive/RedriveResult.php b/src/Redrive/RedriveResult.php new file mode 100644 index 0000000..05804a0 --- /dev/null +++ b/src/Redrive/RedriveResult.php @@ -0,0 +1,21 @@ + $items + */ + public function __construct( + public readonly int $redriven, + public readonly int $skipped, + public readonly array $items, + ) { + } +} diff --git a/tests/Redrive/RedriveTest.php b/tests/Redrive/RedriveTest.php new file mode 100644 index 0000000..49e2ee1 --- /dev/null +++ b/tests/Redrive/RedriveTest.php @@ -0,0 +1,184 @@ +> */ + public array $queues = []; + + public ?string $failQueue = null; + + public function pop(string $queue): ?array + { + if (empty($this->queues[$queue])) { + return null; + } + + return ['body' => array_shift($this->queues[$queue]), 'handle' => null]; + } + + public function ack(mixed $handle): void + { + // no-op: pop already removed the message + } + + public function publish(string $queue, string $body): void + { + if ($queue === $this->failQueue) { + throw new RuntimeException('publish refused'); + } + $this->queues[$queue][] = $body; + } +} + +final class RedriveTest extends TestCase +{ + /** + * @param array $data + * @return array + */ + private function deadLettered(string $urn, string $originalQueue, array $data = []): array + { + $env = EnvelopeCodec::make($urn, $data, $originalQueue); + + return DeadLetter::annotate($env, 'failed', new RuntimeException('boom'), $originalQueue, 3); + } + + public function testRedriveToSourceResetsAndPreservesIdentity(): void + { + $io = new FakeIO(); + $dl = $this->deadLettered('urn:babel:orders:created', 'orders', ['order_id' => 1]); + $traceId = $dl['trace_id']; + $io->publish('orders.dlq', EnvelopeCodec::encode($dl)); + + $res = Redrive::run($io, 'orders.dlq'); + + $this->assertSame(1, $res->redriven); + $this->assertSame(0, $res->skipped); + $this->assertEmpty($io->queues['orders.dlq'] ?? []); + $this->assertCount(1, $io->queues['orders']); + + $back = EnvelopeCodec::decode($io->queues['orders'][0]); + $this->assertArrayNotHasKey('dead_letter', $back); + $this->assertSame(0, $back['attempts']); + $this->assertSame($traceId, $back['trace_id']); + $this->assertSame('urn:babel:orders:created', $back['job']); + } + + public function testRedriveToSandboxLeavesSourceUntouched(): void + { + $io = new FakeIO(); + $io->publish('orders.dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:orders:created', 'orders'))); + + $res = Redrive::run($io, 'orders.dlq', new RedriveOptions(toQueue: 'sandbox')); + + $this->assertSame(1, $res->redriven); + $this->assertEmpty($io->queues['orders'] ?? []); + $this->assertCount(1, $io->queues['sandbox']); + } + + public function testDryRunReportsPlanAndLeavesDlqUnchanged(): void + { + $io = new FakeIO(); + $io->publish('orders.dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:orders:created', 'orders'))); + + $res = Redrive::run($io, 'orders.dlq', new RedriveOptions(dryRun: true)); + + $this->assertSame(0, $res->redriven); + $this->assertSame(1, $res->skipped); + $this->assertCount(1, $res->items); + $this->assertSame('orders', $res->items[0]->to); + $this->assertFalse($res->items[0]->redriven); + $this->assertEmpty($io->queues['orders'] ?? []); + $this->assertCount(1, $io->queues['orders.dlq']); + $this->assertArrayHasKey('dead_letter', EnvelopeCodec::decode($io->queues['orders.dlq'][0])); + } + + public function testSelectRedrivesOnlyMatchesAndRestoresTheRest(): void + { + $io = new FakeIO(); + $io->publish('dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:orders:created', 'orders'))); + $io->publish('dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:emails:welcome', 'emails'))); + + $res = Redrive::run($io, 'dlq', new RedriveOptions( + select: static fn (array $e): bool => EnvelopeCodec::urn($e) === 'urn:babel:orders:created', + )); + + $this->assertSame(1, $res->redriven); + $this->assertSame(1, $res->skipped); + $this->assertCount(1, $io->queues['orders']); + $this->assertEmpty($io->queues['emails'] ?? []); + $this->assertCount(1, $io->queues['dlq']); // unselected restored + } + + public function testMaxCapsHowManyArePulled(): void + { + $io = new FakeIO(); + for ($i = 0; $i < 3; $i++) { + $io->publish('dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:orders:created', 'orders'))); + } + + $res = Redrive::run($io, 'dlq', new RedriveOptions(max: 2)); + + $this->assertSame(2, $res->redriven); + $this->assertCount(1, $io->queues['dlq']); + } + + public function testPublishFailureRestoresToDlq(): void + { + $io = new FakeIO(); + $io->publish('dlq', EnvelopeCodec::encode($this->deadLettered('urn:babel:orders:created', 'orders'))); + $io->failQueue = 'orders'; + + try { + Redrive::run($io, 'dlq'); + $this->fail('expected the publish error to surface'); + } catch (RuntimeException $e) { + $this->assertSame('publish refused', $e->getMessage()); + } + + $this->assertCount(1, $io->queues['dlq']); // restored, not lost + $this->assertEmpty($io->queues['orders'] ?? []); + } + + public function testUndecodableBodyIsRestored(): void + { + $io = new FakeIO(); + $io->publish('dlq', 'not-json{{{'); + + $res = Redrive::run($io, 'dlq'); + + $this->assertSame(0, $res->redriven); + $this->assertSame(1, $res->skipped); + $this->assertCount(1, $io->queues['dlq']); + $this->assertSame('not-json{{{', $io->queues['dlq'][0]); + } + + public function testResetIsPureAndStripsDeadLetter(): void + { + $dl = $this->deadLettered('urn:babel:orders:created', 'orders', ['x' => 1]); + $dl['attempts'] = 5; + + $reset = Redrive::reset($dl); + + $this->assertArrayNotHasKey('dead_letter', $reset); + $this->assertSame(0, $reset['attempts']); + $this->assertSame($dl['trace_id'], $reset['trace_id']); + $this->assertArrayHasKey('dead_letter', $dl); // original argument unchanged (pure) + } +}