diff --git a/example/stovepipe/orchestrator/server/BUILD.bazel b/example/stovepipe/orchestrator/server/BUILD.bazel index 7d7a494e..420e0319 100644 --- a/example/stovepipe/orchestrator/server/BUILD.bazel +++ b/example/stovepipe/orchestrator/server/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//stovepipe/core/topickey", "//stovepipe/orchestrator/controller", "//stovepipe/orchestrator/controller/start", + "//stovepipe/orchestrator/controller/validate", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", diff --git a/example/stovepipe/orchestrator/server/main.go b/example/stovepipe/orchestrator/server/main.go index c47b5c8b..b0e24976 100644 --- a/example/stovepipe/orchestrator/server/main.go +++ b/example/stovepipe/orchestrator/server/main.go @@ -38,6 +38,7 @@ import ( "github.com/uber/submitqueue/stovepipe/core/topickey" "github.com/uber/submitqueue/stovepipe/orchestrator/controller" "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start" + "github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -161,7 +162,18 @@ func run() error { if err := primaryConsumer.Register(startController); err != nil { return fmt.Errorf("failed to register start controller: %w", err) } - logger.Info("controllers registered", zap.Int("primary", 1)) + + validateController := validate.NewController(validate.Params{ + Logger: logger.Sugar(), + Scope: scope, + Registry: registry, + TopicKey: topickey.TopicKeyValidate, + ConsumerGroup: "orchestrator-validate", + }) + if err := primaryConsumer.Register(validateController); err != nil { + return fmt.Errorf("failed to register validate controller: %w", err) + } + logger.Info("controllers registered", zap.Int("primary", 2)) if err := primaryConsumer.Start(ctx); err != nil { return fmt.Errorf("failed to start primary consumer: %w", err) @@ -243,5 +255,13 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "orchestrator-validate", ), }, + { + Key: topickey.TopicKeyBatch, + Name: "batch", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-batch", + ), + }, }) } diff --git a/stovepipe/core/topickey/topickey.go b/stovepipe/core/topickey/topickey.go index 2e5f035a..ccab7f3f 100644 --- a/stovepipe/core/topickey/topickey.go +++ b/stovepipe/core/topickey/topickey.go @@ -25,4 +25,7 @@ const ( TopicKeyStart TopicKey = "start" // TopicKeyValidate is the pipeline stage where commits are published for metadata resolution. TopicKeyValidate TopicKey = "validate" + // TopicKeyBatch is the pipeline stage where validated commits are aggregated, since the last + // known green, into a contiguous validation batch. + TopicKeyBatch TopicKey = "batch" ) diff --git a/stovepipe/orchestrator/controller/validate/BUILD.bazel b/stovepipe/orchestrator/controller/validate/BUILD.bazel new file mode 100644 index 00000000..40dc4141 --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/BUILD.bazel @@ -0,0 +1,36 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "validate", + srcs = ["validate.go"], + importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//stovepipe/core/topickey", + "//stovepipe/entity", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "validate_test", + srcs = ["validate_test.go"], + embed = [":validate"], + deps = [ + "//platform/base/change", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/topickey", + "//stovepipe/entity", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/stovepipe/orchestrator/controller/validate/validate.go b/stovepipe/orchestrator/controller/validate/validate.go new file mode 100644 index 00000000..cb5e8f46 --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/validate.go @@ -0,0 +1,159 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/core/topickey" + entity "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/zap" +) + +// Controller handles validate queue messages. It consumes the validate topic and +// forwards the ingest request to the batch stage, propagating the envelope partition +// key for ordering. +// +// This step will include any validation activities prior to adding the commit to a batch. +// +// The ordering key is decided once at ingestion and carried through the pipeline. +// +// Currently a forwarding stub. + +var _ consumer.Controller = (*Controller)(nil) + +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new validate controller. +type Params struct { + Registry consumer.TopicRegistry + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new validate controller for the orchestrator. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("validate_controller"), + metricsScope: p.Scope.SubScope("validate_controller"), + registry: p.Registry, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +// Process validates the ingest request and forwards it to the batch stage. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + request, err := entity.IngestRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: malformed messages will never succeed regardless of retry count. + return fmt.Errorf("failed to deserialize ingest request: %w", err) + } + + // The ordering key lives on the message envelope, stamped by the gateway at + // ingestion; the controller propagates it verbatim to the next stage. + partitionKey := msg.PartitionKey + if partitionKey == "" { + metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1) + return fmt.Errorf("ingest request %s is missing a partition key (must be stamped by the producer)", request.ID) + } + + c.logger.Infow("received ingest request", + "spid", request.ID, + "queue", request.Queue, + "change_uris", request.Change.URIs, + "change_count", len(request.Change.URIs), + "attempt", delivery.Attempt(), + "partition_key", partitionKey, + ) + + // Core logic to be added here: + // - Validation before publishing to batch + // - Emit status + log events + + if err := c.publish(ctx, topickey.TopicKeyBatch, request, partitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish to batch: %w", err) + } + + c.logger.Infow("published ingest request to batch", + "spid", request.ID, + "topic_key", topickey.TopicKeyBatch, + ) + + return nil +} + +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.IngestRequest, partitionKey string) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize ingest request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "validate" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/stovepipe/orchestrator/controller/validate/validate_test.go b/stovepipe/orchestrator/controller/validate/validate_test.go new file mode 100644 index 00000000..7b342b1a --- /dev/null +++ b/stovepipe/orchestrator/controller/validate/validate_test.go @@ -0,0 +1,166 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/stovepipe/core/topickey" + entity "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +const ( + testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" + testSPID = "stovepipe-monorepo/1" + testQueue = "stovepipe-monorepo" + testPartitionKey = "stovepipe-monorepo" +) + +// captureRegistry builds a topic registry whose batch publisher records the +// last message it received into captured (when non-nil) and returns publishErr. +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyBatch, Name: "batch", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + TopicKey: topickey.TopicKeyValidate, + ConsumerGroup: "orchestrator-validate", + }) +} + +// makeDelivery builds a delivery whose envelope carries partitionKey, the +// ordering key the producer stamps at ingestion. +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitionKey string) *queuemock.MockDelivery { + t.Helper() + + msg := entityqueue.NewMessage(testSPID, payload, partitionKey, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +// validPayload is an ingest request as the upstream stage forwards it: identity, +// queue, and the change URIs. The ordering key rides on the message envelope, not +// the payload. +func validPayload(t *testing.T) []byte { + t.Helper() + payload, err := entity.IngestRequest{ + ID: testSPID, + Queue: testQueue, + Change: change.Change{URIs: []string{testURI}}, + }.ToBytes() + require.NoError(t, err) + return payload +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil, nil) + + require.NotNil(t, controller) + assert.Equal(t, topickey.TopicKeyValidate, controller.TopicKey()) + assert.Equal(t, "orchestrator-validate", controller.ConsumerGroup()) + assert.Equal(t, "validate", controller.Name()) +} + +func TestController_Process_PublishesToBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, nil, &captured) + delivery := makeDelivery(t, ctrl, validPayload(t), testPartitionKey) + + require.NoError(t, controller.Process(context.Background(), delivery)) + + // validate forwards the request to batch, keyed by spid for idempotency and + // propagating the envelope partition key verbatim to the next hop. + assert.Equal(t, testSPID, captured.ID) + assert.Equal(t, testPartitionKey, captured.PartitionKey) + + forwarded, err := entity.IngestRequestFromBytes(captured.Payload) + require.NoError(t, err) + assert.Equal(t, testSPID, forwarded.ID) + assert.Equal(t, []string{testURI}, forwarded.Change.URIs) +} + +func TestController_Process_Errors(t *testing.T) { + tests := []struct { + name string + payload []byte + partitionKey string + }{ + {name: "invalid json", payload: []byte(`{"invalid": json"}`), partitionKey: testPartitionKey}, + {name: "missing id", payload: []byte(`{"queue":"q","change":{"uris":["git://x/y/z/sha"]}}`), partitionKey: testPartitionKey}, + {name: "no uris", payload: []byte(`{"id":"q/1","queue":"q","change":{"uris":[]}}`), partitionKey: testPartitionKey}, + // Valid request, but the producer failed to stamp an envelope partition key. + {name: "missing partition key", payload: validPayload(t), partitionKey: ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil, nil) + delivery := makeDelivery(t, ctrl, tt.payload, tt.partitionKey) + + require.Error(t, controller.Process(context.Background(), delivery)) + }) + } +} + +func TestController_Process_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, assert.AnError, nil) + delivery := makeDelivery(t, ctrl, validPayload(t), testPartitionKey) + + require.Error(t, controller.Process(context.Background(), delivery)) +}