From d152470c046dac24249b36441c3d1800bb622cc2 Mon Sep 17 00:00:00 2001 From: mnoah1 Date: Tue, 16 Jun 2026 22:05:55 +0000 Subject: [PATCH] feat(stovepipe): add validate controller forwarding commits to batch Mirror the start controller's shape for the validate pipeline stage: the validate controller consumes the validate topic, guards the carried partition key, and forwards the change event to the batch stage. Adds the batch topic key, wires the controller into the example orchestrator, and registers the batch topic in the registry. Currently a forwarding stub; placeholder comments describe the eventual commit-metadata resolution per the workflow RFC. --- .../stovepipe/orchestrator/server/BUILD.bazel | 1 + example/stovepipe/orchestrator/server/main.go | 22 ++- stovepipe/core/topickey/topickey.go | 3 + .../controller/validate/BUILD.bazel | 36 ++++ .../controller/validate/validate.go | 159 +++++++++++++++++ .../controller/validate/validate_test.go | 166 ++++++++++++++++++ 6 files changed, 386 insertions(+), 1 deletion(-) create mode 100644 stovepipe/orchestrator/controller/validate/BUILD.bazel create mode 100644 stovepipe/orchestrator/controller/validate/validate.go create mode 100644 stovepipe/orchestrator/controller/validate/validate_test.go diff --git a/example/stovepipe/orchestrator/server/BUILD.bazel b/example/stovepipe/orchestrator/server/BUILD.bazel index 7d7a494e..420e0319 100644 --- a/example/stovepipe/orchestrator/server/BUILD.bazel +++ b/example/stovepipe/orchestrator/server/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//stovepipe/core/topickey", "//stovepipe/orchestrator/controller", "//stovepipe/orchestrator/controller/start", + "//stovepipe/orchestrator/controller/validate", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", diff --git a/example/stovepipe/orchestrator/server/main.go b/example/stovepipe/orchestrator/server/main.go index c47b5c8b..b0e24976 100644 --- a/example/stovepipe/orchestrator/server/main.go +++ b/example/stovepipe/orchestrator/server/main.go @@ -38,6 +38,7 @@ import ( "github.com/uber/submitqueue/stovepipe/core/topickey" "github.com/uber/submitqueue/stovepipe/orchestrator/controller" "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start" + "github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -161,7 +162,18 @@ func run() error { if err := primaryConsumer.Register(startController); err != nil { return fmt.Errorf("failed to register start controller: %w", err) } - logger.Info("controllers registered", zap.Int("primary", 1)) + + validateController := validate.NewController(validate.Params{ + Logger: logger.Sugar(), + Scope: scope, + Registry: registry, + TopicKey: topickey.TopicKeyValidate, + ConsumerGroup: "orchestrator-validate", + }) + if err := primaryConsumer.Register(validateController); err != nil { + return fmt.Errorf("failed to register validate controller: %w", err) + } + logger.Info("controllers registered", zap.Int("primary", 2)) if err := primaryConsumer.Start(ctx); err != nil { return fmt.Errorf("failed to start primary consumer: %w", err) @@ -243,5 +255,13 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "orchestrator-validate", ), }, + { + Key: topickey.TopicKeyBatch, + Name: "batch", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-batch", + ), + }, }) } diff --git a/stovepipe/core/topickey/topickey.go b/stovepipe/core/topickey/topickey.go index 2e5f035a..ccab7f3f 100644 --- a/stovepipe/core/topickey/topickey.go +++ b/stovepipe/core/topickey/topickey.go @@ -25,4 +25,7 @@ const ( TopicKeyStart TopicKey = "start" // TopicKeyValidate is the pipeline stage where commits are published for metadata resolution. TopicKeyValidate TopicKey = "validate" + // TopicKeyBatch is the pipeline stage where validated commits are aggregated, since the last + // known green, into a contiguous validation batch. + TopicKeyBatch TopicKey = "batch" ) diff --git a/stovepipe/orchestrator/controller/validate/BUILD.bazel b/stovepipe/orchestrator/controller/validate/BUILD.bazel new file mode 100644 index 00000000..40dc4141 --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/BUILD.bazel @@ -0,0 +1,36 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "validate", + srcs = ["validate.go"], + importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//stovepipe/core/topickey", + "//stovepipe/entity", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "validate_test", + srcs = ["validate_test.go"], + embed = [":validate"], + deps = [ + "//platform/base/change", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/topickey", + "//stovepipe/entity", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/stovepipe/orchestrator/controller/validate/validate.go b/stovepipe/orchestrator/controller/validate/validate.go new file mode 100644 index 00000000..cb5e8f46 --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/validate.go @@ -0,0 +1,159 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/core/topickey" + entity "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/zap" +) + +// Controller handles validate queue messages. It consumes the validate topic and +// forwards the ingest request to the batch stage, propagating the envelope partition +// key for ordering. +// +// This step will include any validation activities prior to adding the commit to a batch. +// +// The ordering key is decided once at ingestion and carried through the pipeline. +// +// Currently a forwarding stub. + +var _ consumer.Controller = (*Controller)(nil) + +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new validate controller. +type Params struct { + Registry consumer.TopicRegistry + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new validate controller for the orchestrator. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("validate_controller"), + metricsScope: p.Scope.SubScope("validate_controller"), + registry: p.Registry, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +// Process validates the ingest request and forwards it to the batch stage. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + request, err := entity.IngestRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: malformed messages will never succeed regardless of retry count. + return fmt.Errorf("failed to deserialize ingest request: %w", err) + } + + // The ordering key lives on the message envelope, stamped by the gateway at + // ingestion; the controller propagates it verbatim to the next stage. + partitionKey := msg.PartitionKey + if partitionKey == "" { + metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1) + return fmt.Errorf("ingest request %s is missing a partition key (must be stamped by the producer)", request.ID) + } + + c.logger.Infow("received ingest request", + "spid", request.ID, + "queue", request.Queue, + "change_uris", request.Change.URIs, + "change_count", len(request.Change.URIs), + "attempt", delivery.Attempt(), + "partition_key", partitionKey, + ) + + // Core logic to be added here: + // - Validation before publishing to batch + // - Emit status + log events + + if err := c.publish(ctx, topickey.TopicKeyBatch, request, partitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish to batch: %w", err) + } + + c.logger.Infow("published ingest request to batch", + "spid", request.ID, + "topic_key", topickey.TopicKeyBatch, + ) + + return nil +} + +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.IngestRequest, partitionKey string) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize ingest request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "validate" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/stovepipe/orchestrator/controller/validate/validate_test.go b/stovepipe/orchestrator/controller/validate/validate_test.go new file mode 100644 index 00000000..7b342b1a --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/validate_test.go @@ -0,0 +1,166 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/stovepipe/core/topickey" + entity "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +const ( + testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" + testSPID = "stovepipe-monorepo/1" + testQueue = "stovepipe-monorepo" + testPartitionKey = "stovepipe-monorepo" +) + +// captureRegistry builds a topic registry whose batch publisher records the +// last message it received into captured (when non-nil) and returns publishErr. +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyBatch, Name: "batch", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + TopicKey: topickey.TopicKeyValidate, + ConsumerGroup: "orchestrator-validate", + }) +} + +// makeDelivery builds a delivery whose envelope carries partitionKey, the +// ordering key the producer stamps at ingestion. +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitionKey string) *queuemock.MockDelivery { + t.Helper() + + msg := entityqueue.NewMessage(testSPID, payload, partitionKey, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +// validPayload is an ingest request as the upstream stage forwards it: identity, +// queue, and the change URIs. The ordering key rides on the message envelope, not +// the payload. +func validPayload(t *testing.T) []byte { + t.Helper() + payload, err := entity.IngestRequest{ + ID: testSPID, + Queue: testQueue, + Change: change.Change{URIs: []string{testURI}}, + }.ToBytes() + require.NoError(t, err) + return payload +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil, nil) + + require.NotNil(t, controller) + assert.Equal(t, topickey.TopicKeyValidate, controller.TopicKey()) + assert.Equal(t, "orchestrator-validate", controller.ConsumerGroup()) + assert.Equal(t, "validate", controller.Name()) +} + +func TestController_Process_PublishesToBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, nil, &captured) + delivery := makeDelivery(t, ctrl, validPayload(t), testPartitionKey) + + require.NoError(t, controller.Process(context.Background(), delivery)) + + // validate forwards the request to batch, keyed by spid for idempotency and + // propagating the envelope partition key verbatim to the next hop. + assert.Equal(t, testSPID, captured.ID) + assert.Equal(t, testPartitionKey, captured.PartitionKey) + + forwarded, err := entity.IngestRequestFromBytes(captured.Payload) + require.NoError(t, err) + assert.Equal(t, testSPID, forwarded.ID) + assert.Equal(t, []string{testURI}, forwarded.Change.URIs) +} + +func TestController_Process_Errors(t *testing.T) { + tests := []struct { + name string + payload []byte + partitionKey string + }{ + {name: "invalid json", payload: []byte(`{"invalid": json"}`), partitionKey: testPartitionKey}, + {name: "missing id", payload: []byte(`{"queue":"q","change":{"uris":["git://x/y/z/sha"]}}`), partitionKey: testPartitionKey}, + {name: "no uris", payload: []byte(`{"id":"q/1","queue":"q","change":{"uris":[]}}`), partitionKey: testPartitionKey}, + // Valid request, but the producer failed to stamp an envelope partition key. + {name: "missing partition key", payload: validPayload(t), partitionKey: ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil, nil) + delivery := makeDelivery(t, ctrl, tt.payload, tt.partitionKey) + + require.Error(t, controller.Process(context.Background(), delivery)) + }) + } +} + +func TestController_Process_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, assert.AnError, nil) + delivery := makeDelivery(t, ctrl, validPayload(t), testPartitionKey) + + require.Error(t, controller.Process(context.Background(), delivery)) +}