From 5d0b710b2ee3a5d002fde00c43a36b30b837d357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 06:19:08 +0300 Subject: [PATCH] feat(otel): optional OpenTelemetry tracing via the @babelqueue/core/otel subpath (ADR-0025) Mirrors the Go/Python reference: a new src/otel.ts (reached only via the @babelqueue/core/otel subpath, so importing the core stays dependency-free) emitting produce/consume spans correlated across hops via trace_id<->32-hex OTel TraceID. wrapHandler (consumer span) + publish (producer-span wrapper around a send callback). @opentelemetry/api is an optional peer dependency. Envelope untouched (GR-1); opt-in. --- package-lock.json | 81 +++++++++++++++++++++++ package.json | 22 ++++++- src/otel.ts | 162 ++++++++++++++++++++++++++++++++++++++++++++++ test/otel.test.ts | 118 +++++++++++++++++++++++++++++++++ tsup.config.ts | 2 +- 5 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 src/otel.ts create mode 100644 test/otel.test.ts diff --git a/package-lock.json b/package-lock.json index 61cbec8..ebc28c3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,8 @@ "license": "MIT", "devDependencies": { "@eslint/js": "^10.0.1", + "@opentelemetry/api": "^1.9.1", + "@opentelemetry/sdk-trace-base": "^2.8.0", "@types/node": "^22", "c8": "^11.0.0", "eslint": "^10.4.1", @@ -20,6 +22,14 @@ }, "engines": { "node": ">=18" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.9.1" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } } }, "node_modules/@bcoe/v8-coverage": { @@ -717,6 +727,77 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.1.tgz", + "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/@opentelemetry/core": { + "version": "2.8.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-2.8.0.tgz", + "integrity": "sha512-hd1Lfh8p545nNz+jq1Ejfz+Mn1hyLuxYn1YzTfFNrxr8urEWMNQLPf1Th8kjOH+HxwawCrtgBp8JpBUR4ZSgww==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/semantic-conventions": "^1.29.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/resources": { + "version": "2.8.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-2.8.0.tgz", + "integrity": "sha512-qmXQ27ilDbUK/vGMqwL8D4/rhn76C+sherM4wTbjlfknR8Nvfc/hCxjRJPhkzZzUsPiNg16SA31NxMabwttRjg==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "2.8.0", + "@opentelemetry/semantic-conventions": "^1.29.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/sdk-trace-base": { + "version": "2.8.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-base/-/sdk-trace-base-2.8.0.tgz", + "integrity": "sha512-mhU4jp+vW0mGbFRd+GeXHvmfA4aDqWjBjLC3pE5XMpLs0IE2ryYb019Ts2AQrOq67gaTF25D91+fgvEHDZEnuQ==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "2.8.0", + "@opentelemetry/resources": "2.8.0", + "@opentelemetry/semantic-conventions": "^1.29.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/semantic-conventions": { + "version": "1.41.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.41.1.tgz", + "integrity": "sha512-/UhIkaZgPutTFmQ7RnIJGgDXZmtEJ7Dvi86xNTFWcnRxVRNk/aotsqDJYeEvDP+FSMB2SdW+pQzNMcWP0rwuNA==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=14" + } + }, "node_modules/@rollup/rollup-android-arm-eabi": { "version": "4.61.1", "resolved": "https://registry.npmjs.org/@rollup/rollup-android-arm-eabi/-/rollup-android-arm-eabi-4.61.1.tgz", diff --git a/package.json b/package.json index c6d195d..31c0dbd 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,16 @@ "types": "./dist/index.d.cts", "default": "./dist/index.cjs" } + }, + "./otel": { + "import": { + "types": "./dist/otel.d.ts", + "default": "./dist/otel.js" + }, + "require": { + "types": "./dist/otel.d.cts", + "default": "./dist/otel.cjs" + } } }, "main": "./dist/index.cjs", @@ -49,12 +59,14 @@ "build": "tsup", "typecheck": "tsc --noEmit", "lint": "eslint src test", - "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts", + "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts", "coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test", "prepublishOnly": "npm run build" }, "devDependencies": { "@eslint/js": "^10.0.1", + "@opentelemetry/api": "^1.9.1", + "@opentelemetry/sdk-trace-base": "^2.8.0", "@types/node": "^22", "c8": "^11.0.0", "eslint": "^10.4.1", @@ -65,5 +77,13 @@ }, "publishConfig": { "access": "public" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.9.1" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } } } diff --git a/src/otel.ts b/src/otel.ts new file mode 100644 index 0000000..1d31b85 --- /dev/null +++ b/src/otel.ts @@ -0,0 +1,162 @@ +/** + * Optional OpenTelemetry tracing (ADR-0025) — the Node mirror of `babelqueue-go/otel`. + * + * Emits a CONSUMER span per handled message and a PRODUCER span per publish, correlating them + * across every hop and SDK through the envelope's `trace_id` — a UUID, which maps 1:1 to a + * 32-hex OTel trace id. The wire envelope is untouched (GR-1) and the zero-dependency core + * never imports OpenTelemetry: this module pulls `@opentelemetry/api` as an **optional peer + * dependency** and is reached only via the `@babelqueue/core/otel` subpath, so importing the + * core itself stays dependency-free. + * + * ```ts + * import { trace } from "@opentelemetry/api"; + * import { wrapHandler, publish } from "@babelqueue/core/otel"; + * + * const tracer = trace.getTracer("orders"); + * const traced = wrapHandler(tracer, async (env) => { ... }); // consumer + * await publish(tracer, "urn:babel:orders:created", { order_id: 1 }, // producer + * (env) => myTransport.send(env)); + * ``` + * + * Every hop that shares a `trace_id` shares one OTel trace. Exact cross-hop *span* parent-child + * linkage (W3C `traceparent` as a transport header) is a documented follow-up. + */ + +import { createHash } from "node:crypto"; + +import { + SpanKind, + SpanStatusCode, + TraceFlags, + context as otelContext, + trace, + type Attributes, + type Context, + type Tracer, +} from "@opentelemetry/api"; + +import { EnvelopeCodec, type Envelope, type MakeOptions } from "./codec.js"; +import type { Handler } from "./idempotency.js"; + +const SYSTEM = "babelqueue"; +const INVALID_TRACE_ID = "00000000000000000000000000000000"; +const INVALID_SPAN_ID = "0000000000000000"; + +/** + * Map an envelope `trace_id` to a deterministic 32-hex OTel trace id: a UUID maps to its + * hex bytes; any other string is hashed (SHA-256, first 16 bytes). The inverse of {@link uuidOf} + * for the UUID case. + */ +export function traceIdOf(traceId: string): string { + const hex = traceId.replace(/-/g, "").toLowerCase(); + if (/^[0-9a-f]{32}$/.test(hex) && hex !== INVALID_TRACE_ID) { + return hex; + } + return createHash("sha256").update(traceId).digest("hex").slice(0, 32); +} + +/** + * Format a 32-hex OTel trace id as a canonical UUID string — the form a producer stamps into + * the message's `trace_id` so a consumer can recover the same trace id via {@link traceIdOf}. + */ +export function uuidOf(traceIdHex: string): string { + const h = traceIdHex.replace(/-/g, "").toLowerCase().padStart(32, "0").slice(0, 32); + return `${h.slice(0, 8)}-${h.slice(8, 12)}-${h.slice(12, 16)}-${h.slice(16, 20)}-${h.slice(20, 32)}`; +} + +/** Deterministic, non-zero 16-hex span id so the remote parent context is valid. */ +function spanIdOf(traceId: string): string { + const sid = createHash("sha256").update(`babelqueue-span:${traceId}`).digest("hex").slice(0, 16); + return sid === INVALID_SPAN_ID ? "0000000000000001" : sid; +} + +/** A context carrying a remote parent in the `trace_id`-derived trace. */ +function parentContext(traceId: string): Context { + return trace.setSpanContext(otelContext.active(), { + traceId: traceIdOf(traceId), + spanId: spanIdOf(traceId), + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }); +} + +function consumeAttributes(env: Envelope): Attributes { + return { + "messaging.system": SYSTEM, + "messaging.operation": "process", + "messaging.destination.name": env.meta?.queue ?? "", + "messaging.message.id": env.meta?.id ?? "", + "messaging.message.conversation_id": env.trace_id, + "messaging.babelqueue.attempts": env.attempts ?? 0, + }; +} + +/** + * Wrap a consume handler to emit a CONSUMER span per message, in the OTel trace derived from + * the envelope's `trace_id`, recording the handler's error/status. The handler receives the + * full {@link Envelope} as before. + */ +export function wrapHandler( + tracer: Tracer, + handler: Handler, +): (env: Envelope) => Promise { + return (env: Envelope): Promise => { + const ctx = parentContext(env.trace_id); + return tracer.startActiveSpan( + `process ${env.job ?? ""}`, + { kind: SpanKind.CONSUMER, attributes: consumeAttributes(env) }, + ctx, + async (span) => { + try { + await handler(env); + } catch (err) { + span.recordException(err as Error); + span.setStatus({ code: SpanStatusCode.ERROR, message: (err as Error).message }); + throw err; + } finally { + span.end(); + } + }, + ); + }; +} + +/** + * Run a publish under a PRODUCER span `publish `, carrying the active trace's id into the + * built envelope's `trace_id` so the downstream consumer recovers the same trace. `send` + * performs the real transport write and its result is returned. + */ +export function publish( + tracer: Tracer, + urn: string, + data: Record, + send: (envelope: Envelope) => R | Promise, + options: MakeOptions = {}, +): Promise { + return tracer.startActiveSpan( + `publish ${urn}`, + { + kind: SpanKind.PRODUCER, + attributes: { + "messaging.system": SYSTEM, + "messaging.operation": "publish", + "messaging.destination.name": urn, + }, + }, + async (span) => { + try { + const traceId = uuidOf(span.spanContext().traceId); + const envelope = EnvelopeCodec.make(urn, data, { ...options, traceId }); + const result = await send(envelope); + span.setAttribute("messaging.message.id", envelope.meta.id); + return result; + } catch (err) { + span.recordException(err as Error); + span.setStatus({ code: SpanStatusCode.ERROR, message: (err as Error).message }); + throw err; + } finally { + span.end(); + } + }, + ); +} diff --git a/test/otel.test.ts b/test/otel.test.ts new file mode 100644 index 0000000..38357e7 --- /dev/null +++ b/test/otel.test.ts @@ -0,0 +1,118 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { SpanKind, SpanStatusCode, type Tracer } from "@opentelemetry/api"; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from "@opentelemetry/sdk-trace-base"; + +import { EnvelopeCodec, type Envelope } from "../src/codec.js"; +import { publish, traceIdOf, uuidOf, wrapHandler } from "../src/otel.js"; + +const TRACE_ID = "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b"; + +function recorder(): { tracer: Tracer; exporter: InMemorySpanExporter } { + const exporter = new InMemorySpanExporter(); + const provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + return { tracer: provider.getTracer("test"), exporter }; +} + +test("traceId <-> UUID round-trips, and a non-uuid is hashed", () => { + const hex = traceIdOf(TRACE_ID); + assert.match(hex, /^[0-9a-f]{32}$/); + assert.equal(uuidOf(hex), TRACE_ID); + assert.equal(traceIdOf("not-a-uuid"), traceIdOf("not-a-uuid")); + assert.notEqual(traceIdOf("not-a-uuid"), hex); + assert.match(traceIdOf("z".repeat(32)), /^[0-9a-f]{32}$/); // 32 chars, not hex -> hashed +}); + +test("wrapHandler emits a CONSUMER span in the trace_id-derived trace", async () => { + const { tracer, exporter } = recorder(); + let called = false; + const env = EnvelopeCodec.make("urn:babel:orders:created", { order_id: 1 }, { queue: "orders" }); + + await wrapHandler(tracer, async () => { + called = true; + })(env); + + assert.ok(called); + const spans = exporter.getFinishedSpans(); + assert.equal(spans.length, 1); + const span = spans[0]; + assert.equal(span.name, "process urn:babel:orders:created"); + assert.equal(span.kind, SpanKind.CONSUMER); + assert.equal(span.spanContext().traceId, traceIdOf(env.trace_id)); + assert.equal(span.attributes["messaging.message.conversation_id"], env.trace_id); + assert.equal(span.attributes["messaging.destination.name"], "orders"); +}); + +test("wrapHandler tolerates an envelope missing optional meta/attempts", async () => { + const { tracer, exporter } = recorder(); + const partial = { job: "urn:babel:orders:created", trace_id: TRACE_ID } as Envelope; + + await wrapHandler(tracer, async () => {})(partial); + + const span = exporter.getFinishedSpans()[0]; + assert.equal(span.attributes["messaging.destination.name"], ""); + assert.equal(span.attributes["messaging.message.id"], ""); + assert.equal(span.attributes["messaging.babelqueue.attempts"], 0); +}); + +test("wrapHandler records the handler's error and re-throws", async () => { + const { tracer, exporter } = recorder(); + const boom = new Error("boom"); + + await assert.rejects( + wrapHandler(tracer, () => { + throw boom; + })(EnvelopeCodec.make("urn:babel:orders:created", {})), + /boom/, + ); + + const span = exporter.getFinishedSpans()[0]; + assert.equal(span.status.code, SpanStatusCode.ERROR); + assert.ok(span.events.length >= 1); // recorded exception +}); + +test("publish emits a PRODUCER span and stamps trace_id from it", async () => { + const { tracer, exporter } = recorder(); + let sent: Envelope | undefined; + + const id = await publish( + tracer, + "urn:babel:orders:created", + { order_id: 7 }, + (env) => { + sent = env; + return env.meta.id; + }, + ); + + const span = exporter.getFinishedSpans()[0]; + assert.equal(span.kind, SpanKind.PRODUCER); + assert.equal(span.attributes["messaging.message.id"], id); + assert.ok(sent); + // the published trace_id encodes the producer span's trace, so a consumer recovers it + assert.equal(sent.trace_id, uuidOf(span.spanContext().traceId)); + assert.equal(traceIdOf(sent.trace_id), span.spanContext().traceId); +}); + +test("publish records a failing send on the span and re-throws", async () => { + const { tracer, exporter } = recorder(); + const boom = new Error("send failed"); + + await assert.rejects( + publish(tracer, "urn:babel:orders:created", { order_id: 7 }, () => { + throw boom; + }), + /send failed/, + ); + + const span = exporter.getFinishedSpans()[0]; + assert.equal(span.kind, SpanKind.PRODUCER); + assert.equal(span.status.code, SpanStatusCode.ERROR); +}); diff --git a/tsup.config.ts b/tsup.config.ts index 42a760a..31fe7bf 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -1,7 +1,7 @@ import { defineConfig } from "tsup"; export default defineConfig({ - entry: ["src/index.ts"], + entry: ["src/index.ts", "src/otel.ts"], format: ["esm", "cjs"], dts: true, clean: true,