diff --git a/pom.xml b/pom.xml index 0a15802..a4873b4 100644 --- a/pom.xml +++ b/pom.xml @@ -51,9 +51,43 @@ UTF-8 UTF-8 5.10.3 + 1.45.0 + + + + io.opentelemetry + opentelemetry-bom + ${opentelemetry.version} + pom + import + + + + + + + io.opentelemetry + opentelemetry-api + true + + + io.opentelemetry + opentelemetry-sdk + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + + org.junit.jupiter junit-jupiter diff --git a/src/main/java/com/babelqueue/otel/Sender.java b/src/main/java/com/babelqueue/otel/Sender.java new file mode 100644 index 0000000..db466bd --- /dev/null +++ b/src/main/java/com/babelqueue/otel/Sender.java @@ -0,0 +1,20 @@ +package com.babelqueue.otel; + +import com.babelqueue.Envelope; + +/** + * Performs the actual transport write for {@link Tracing#publish}. The core is codec-only, so + * the producer helper builds the envelope (stamping the active trace's id into its + * {@code trace_id}) and hands it to a {@code Sender} that writes it to a broker. + */ +@FunctionalInterface +public interface Sender { + + /** + * Sends one already-built envelope to its destination. + * + * @param envelope the envelope to publish + * @throws Exception to signal a failed publish (recorded on the producer span and re-thrown) + */ + void send(Envelope envelope) throws Exception; +} diff --git a/src/main/java/com/babelqueue/otel/Tracing.java b/src/main/java/com/babelqueue/otel/Tracing.java new file mode 100644 index 0000000..aa1e589 --- /dev/null +++ b/src/main/java/com/babelqueue/otel/Tracing.java @@ -0,0 +1,222 @@ +package com.babelqueue.otel; + +import com.babelqueue.Envelope; +import com.babelqueue.EnvelopeCodec; +import com.babelqueue.Meta; +import com.babelqueue.idempotency.Handler; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Optional OpenTelemetry tracing for a babelqueue producer or consumer (ADR-0025) — the Java + * mirror of the Go {@code babelqueue-go/otel}, Python {@code babelqueue.otel} and Node + * {@code @babelqueue/core/otel} helpers. + * + *

Produce/consume spans are correlated across every hop and SDK through the envelope's + * {@code trace_id} — a UUID, which maps 1:1 to a 32-hex OpenTelemetry trace id ({@link #traceIdOf} + * / {@link #uuidOf}). A non-UUID {@code trace_id} is hashed deterministically (SHA-256). + * + *

+ * + *

Every hop that shares a {@code trace_id} shares one OTel trace. Exact cross-hop span + * parent-child linkage (W3C {@code traceparent} as a transport header) is a documented follow-up. + */ +public final class Tracing { + + private static final String SYSTEM = "babelqueue"; + private static final String DEFAULT_QUEUE = "default"; + private static final String INVALID_TRACE_ID = "00000000000000000000000000000000"; + private static final String INVALID_SPAN_ID = "0000000000000000"; + private static final Pattern HEX_32 = Pattern.compile("[0-9a-f]{32}"); + + private Tracing() { + } + + /** + * Maps an envelope {@code trace_id} to a deterministic 32-hex OTel trace id: a UUID maps to + * its hex bytes; any other string is hashed (SHA-256, first 16 bytes). Never the all-zero + * (invalid) trace id. The inverse of {@link #uuidOf} for the UUID case. + * + * @param traceId the envelope {@code trace_id} + * @return a valid 32-hex OTel trace id + */ + public static String traceIdOf(String traceId) { + String hex = normalizeHex(traceId); + if (HEX_32.matcher(hex).matches() && !hex.equals(INVALID_TRACE_ID)) { + return hex; + } + return toHex(sha256(traceId == null ? "" : traceId), 16); + } + + /** + * Formats a 32-hex OTel trace id as a canonical UUID string — the form a producer stamps into + * the message's {@code trace_id} so a consumer can recover the same trace id via + * {@link #traceIdOf}. + * + * @param traceIdHex a 32-hex OTel trace id + * @return the canonical UUID string + */ + public static String uuidOf(String traceIdHex) { + StringBuilder h = new StringBuilder(normalizeHex(traceIdHex)); + while (h.length() < 32) { + h.insert(0, '0'); + } + String s = h.substring(0, 32); + return s.substring(0, 8) + "-" + s.substring(8, 12) + "-" + s.substring(12, 16) + + "-" + s.substring(16, 20) + "-" + s.substring(20, 32); + } + + /** + * Wraps a consume handler to emit a {@code CONSUMER} span per message, in the OTel trace + * derived from the envelope's {@code trace_id}, recording the handler's error/status. The + * handler receives the full {@link Envelope} as before. + * + * @param tracer the OTel tracer + * @param handler the handler to instrument + * @return the wrapped handler + */ + public static Handler wrapHandler(Tracer tracer, Handler handler) { + return envelope -> { + Span span = tracer.spanBuilder("process " + nullToEmpty(envelope.job())) + .setSpanKind(SpanKind.CONSUMER) + .setParent(parentContext(envelope.traceId())) + .setAllAttributes(consumeAttributes(envelope)) + .startSpan(); + try (Scope scope = span.makeCurrent()) { + handler.handle(envelope); + } catch (Exception e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR, nullToEmpty(e.getMessage())); + throw e; + } finally { + span.end(); + } + }; + } + + /** + * Publishes {@code (urn, data)} to the {@code "default"} queue under a {@code PRODUCER} span. + * See {@link #publish(Tracer, String, Map, String, Sender)}. + * + * @param tracer the OTel tracer + * @param urn the message URN + * @param data the message data + * @param send the transport write + * @return the published message id ({@code meta.id}) + * @throws Exception if {@code send} fails (recorded on the span and re-thrown) + */ + public static String publish(Tracer tracer, String urn, Map data, Sender send) + throws Exception { + return publish(tracer, urn, data, DEFAULT_QUEUE, send); + } + + /** + * Emits a {@code PRODUCER} span {@code publish }, carrying the active trace's id into the + * built envelope's {@code trace_id} so the downstream consumer recovers the same trace, then + * writes the envelope via {@code send}. + * + * @param tracer the OTel tracer + * @param urn the message URN + * @param data the message data + * @param queue the destination queue + * @param send the transport write + * @return the published message id ({@code meta.id}) + * @throws Exception if {@code send} fails (recorded on the span and re-thrown) + */ + public static String publish( + Tracer tracer, String urn, Map data, String queue, Sender send) + throws Exception { + Span span = tracer.spanBuilder("publish " + urn) + .setSpanKind(SpanKind.PRODUCER) + .setAttribute("messaging.system", SYSTEM) + .setAttribute("messaging.operation", "publish") + .setAttribute("messaging.destination.name", urn) + .startSpan(); + try (Scope scope = span.makeCurrent()) { + String traceId = uuidOf(span.getSpanContext().getTraceId()); + Envelope envelope = EnvelopeCodec.make(urn, data, queue, traceId); + send.send(envelope); + String id = envelope.meta().id(); + span.setAttribute("messaging.message.id", id); + return id; + } catch (Exception e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR, nullToEmpty(e.getMessage())); + throw e; + } finally { + span.end(); + } + } + + /** A context carrying a remote parent in the {@code trace_id}-derived trace. */ + private static Context parentContext(String traceId) { + SpanContext sc = SpanContext.createFromRemoteParent( + traceIdOf(traceId), spanIdOf(traceId), TraceFlags.getSampled(), TraceState.getDefault()); + return Context.root().with(Span.wrap(sc)); + } + + private static Attributes consumeAttributes(Envelope envelope) { + Meta meta = envelope.meta(); + return Attributes.builder() + .put("messaging.system", SYSTEM) + .put("messaging.operation", "process") + .put("messaging.destination.name", meta == null ? "" : nullToEmpty(meta.queue())) + .put("messaging.message.id", meta == null ? "" : nullToEmpty(meta.id())) + .put("messaging.message.conversation_id", nullToEmpty(envelope.traceId())) + .put("messaging.babelqueue.attempts", (long) envelope.attempts()) + .build(); + } + + /** Deterministic, non-zero 16-hex span id so the remote parent context is valid. */ + private static String spanIdOf(String traceId) { + String sid = toHex(sha256("babelqueue-span:" + (traceId == null ? "" : traceId)), 8); + return sid.equals(INVALID_SPAN_ID) ? "0000000000000001" : sid; + } + + private static String normalizeHex(String s) { + return s == null ? "" : s.replace("-", "").toLowerCase(Locale.ROOT); + } + + private static String nullToEmpty(String s) { + return s == null ? "" : s; + } + + private static byte[] sha256(String s) { + try { + return MessageDigest.getInstance("SHA-256").digest(s.getBytes(StandardCharsets.UTF_8)); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 is required but unavailable", e); + } + } + + private static String toHex(byte[] bytes, int n) { + StringBuilder sb = new StringBuilder(n * 2); + for (int i = 0; i < n; i++) { + sb.append(Character.forDigit((bytes[i] >> 4) & 0xF, 16)); + sb.append(Character.forDigit(bytes[i] & 0xF, 16)); + } + return sb.toString(); + } +} diff --git a/src/main/java/com/babelqueue/otel/package-info.java b/src/main/java/com/babelqueue/otel/package-info.java new file mode 100644 index 0000000..6234418 --- /dev/null +++ b/src/main/java/com/babelqueue/otel/package-info.java @@ -0,0 +1,14 @@ +/** + * Optional OpenTelemetry tracing for babelqueue (ADR-0025) — the Java mirror of the Go + * {@code babelqueue-go/otel} module. + * + *

{@link com.babelqueue.otel.Tracing} emits a {@code CONSUMER} span per handled message and + * a {@code PRODUCER} span per publish, correlating them across every hop and SDK through the + * envelope's {@code trace_id} — a UUID, which maps 1:1 to a 32-hex OpenTelemetry trace id. The + * wire envelope is untouched, and {@code io.opentelemetry:opentelemetry-api} is declared as an + * optional dependency, so the core stays zero-dependency for users who do not opt in. + * + *

Every hop that shares a {@code trace_id} shares one OTel trace. Exact cross-hop span + * parent-child linkage (W3C {@code traceparent} as a transport header) is a documented follow-up. + */ +package com.babelqueue.otel; diff --git a/src/test/java/com/babelqueue/otel/TracingTest.java b/src/test/java/com/babelqueue/otel/TracingTest.java new file mode 100644 index 0000000..ae4a4ad --- /dev/null +++ b/src/test/java/com/babelqueue/otel/TracingTest.java @@ -0,0 +1,119 @@ +package com.babelqueue.otel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.babelqueue.Envelope; +import com.babelqueue.EnvelopeCodec; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TracingTest { + + private static final String TRACE_ID = "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b"; + private static final String INVALID = "00000000000000000000000000000000"; + + private InMemorySpanExporter exporter; + private Tracer tracer; + + @BeforeEach + void setUp() { + exporter = InMemorySpanExporter.create(); + SdkTracerProvider provider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build(); + tracer = provider.get("test"); + } + + private SpanData onlySpan() { + List spans = exporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + return spans.get(0); + } + + @Test + void traceIdRoundTripsAndHashesNonUuid() { + String hex = Tracing.traceIdOf(TRACE_ID); + assertTrue(hex.matches("[0-9a-f]{32}")); + assertEquals(TRACE_ID, Tracing.uuidOf(hex)); + + // a non-uuid maps deterministically to a valid, distinct trace id + assertEquals(Tracing.traceIdOf("not-a-uuid"), Tracing.traceIdOf("not-a-uuid")); + assertNotEquals(hex, Tracing.traceIdOf("not-a-uuid")); + assertTrue(Tracing.traceIdOf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz").matches("[0-9a-f]{32}")); + assertNotEquals(INVALID, Tracing.traceIdOf(null)); + } + + @Test + void wrapHandlerEmitsConsumerSpanInTraceIdTrace() throws Exception { + boolean[] called = {false}; + Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of("order_id", 1), "orders", TRACE_ID); + + Tracing.wrapHandler(tracer, e -> called[0] = true).handle(env); + + assertTrue(called[0]); + SpanData span = onlySpan(); + assertEquals("process urn:babel:orders:created", span.getName()); + assertEquals(SpanKind.CONSUMER, span.getKind()); + assertEquals(Tracing.traceIdOf(TRACE_ID), span.getSpanContext().getTraceId()); + assertEquals(TRACE_ID, span.getAttributes().get(AttributeKey.stringKey("messaging.message.conversation_id"))); + assertEquals("orders", span.getAttributes().get(AttributeKey.stringKey("messaging.destination.name"))); + assertEquals(0L, span.getAttributes().get(AttributeKey.longKey("messaging.babelqueue.attempts"))); + } + + @Test + void wrapHandlerRecordsErrorAndRethrows() { + Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of(), "orders", TRACE_ID); + + IllegalStateException boom = assertThrows(IllegalStateException.class, () -> + Tracing.wrapHandler(tracer, e -> { + throw new IllegalStateException("boom"); + }).handle(env)); + assertEquals("boom", boom.getMessage()); + + SpanData span = onlySpan(); + assertEquals(StatusCode.ERROR, span.getStatus().getStatusCode()); + assertFalse(span.getEvents().isEmpty()); + } + + @Test + void publishEmitsProducerSpanAndStampsTraceId() throws Exception { + Envelope[] sent = {null}; + + String id = Tracing.publish(tracer, "urn:babel:orders:created", Map.of("order_id", 7), e -> sent[0] = e); + + SpanData span = onlySpan(); + assertEquals(SpanKind.PRODUCER, span.getKind()); + assertEquals("publish urn:babel:orders:created", span.getName()); + assertEquals(id, span.getAttributes().get(AttributeKey.stringKey("messaging.message.id"))); + // the published trace_id encodes the producer span's trace, so a consumer recovers it + assertEquals(Tracing.uuidOf(span.getSpanContext().getTraceId()), sent[0].traceId()); + assertEquals(span.getSpanContext().getTraceId(), Tracing.traceIdOf(sent[0].traceId())); + } + + @Test + void publishRecordsAFailingSend() { + Exception boom = assertThrows(IllegalStateException.class, () -> + Tracing.publish(tracer, "urn:babel:orders:created", Map.of(), e -> { + throw new IllegalStateException("send failed"); + })); + assertEquals("send failed", boom.getMessage()); + + SpanData span = onlySpan(); + assertEquals(SpanKind.PRODUCER, span.getKind()); + assertEquals(StatusCode.ERROR, span.getStatus().getStatusCode()); + } +}