From b69adb2fec3c1ac4be3d7a38ca83c9c95f9b8e77 Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Thu, 18 Jun 2026 22:21:44 +0000 Subject: [PATCH] feat(runway): add example orchestrator wiring and Makefile targets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consumer-only wiring with noop VCS factory, docker-compose (mysql-queue only, no app DB), and Makefile targets for build/start/stop. Runway is stateless — no application database needed. Co-Authored-By: Claude Opus 4.6 (1M context) --- Makefile | 39 +++- .../runway/orchestrator/server/BUILD.bazel | 30 +++ example/runway/orchestrator/server/Dockerfile | 9 + .../orchestrator/server/docker-compose.yml | 40 ++++ example/runway/orchestrator/server/main.go | 221 ++++++++++++++++++ 5 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 example/runway/orchestrator/server/BUILD.bazel create mode 100644 example/runway/orchestrator/server/Dockerfile create mode 100644 example/runway/orchestrator/server/docker-compose.yml create mode 100644 example/runway/orchestrator/server/main.go diff --git a/Makefile b/Makefile index 49c110bf..54b44735 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,12 @@ STOVEPIPE_STACK_COMPOSE_FILE = example/stovepipe/docker-compose.yml # Fixed project name for local manual testing (tests use unique random names) STOVEPIPE_LOCAL_PROJECT = stovepipe +# Runway compose files +RUNWAY_ORCHESTRATOR_COMPOSE_FILE = example/runway/orchestrator/server/docker-compose.yml + +# Fixed project name for local manual testing (tests use unique random names) +RUNWAY_LOCAL_PROJECT = runway + # yamlfmt version for YAML formatting (override with: make fmt YAMLFMT_VERSION=v0.16.0) YAMLFMT_VERSION ?= v0.16.0 @@ -47,7 +53,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-init-stovepipe-queue-schema local-init-submitqueue-schemas local-runway-orchestrator-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -56,7 +62,7 @@ build: ## Build all services and examples @echo "Build complete!" # Build Linux binaries required for Docker containers -build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker +build-all-linux: build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker @echo "All Linux binaries ready for Docker" build-submitqueue-gateway-linux: ## Build Gateway Linux binary for Docker @@ -75,6 +81,14 @@ build-submitqueue-orchestrator-linux: ## Build Orchestrator Linux binary for Doc cp -f bazel-bin/example/submitqueue/orchestrator/server/orchestrator .docker-bin/orchestrator @echo "Orchestrator Linux binary ready at .docker-bin/orchestrator" +build-runway-orchestrator-linux: ## Build Runway orchestrator Linux binary for Docker + @echo "Building Runway orchestrator Linux binary for Docker..." + @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator + @mkdir -p .docker-bin + @cp -f bazel-bin/example/runway/orchestrator/server/orchestrator_/orchestrator .docker-bin/runway-orchestrator 2>/dev/null || \ + cp -f bazel-bin/example/runway/orchestrator/server/orchestrator .docker-bin/runway-orchestrator + @echo "Runway orchestrator Linux binary ready at .docker-bin/runway-orchestrator" + build-stovepipe-gateway-linux: ## Build Stovepipe gateway Linux binary for Docker @echo "Building Stovepipe gateway Linux binary for Docker..." @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/stovepipe/gateway/server:gateway @@ -209,6 +223,14 @@ local-init-submitqueue-schemas: ## Manually apply all database schemas done @echo "✅ All schemas applied successfully" +local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Runway compose stacks + @echo "Applying queue schema to mysql-queue (Runway; stateless — no app schema)..." + @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "Runway queue schema applied successfully" + local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for Stovepipe compose stacks @echo "Applying queue schema to mysql-queue (Stovepipe; no app storage/counter schema yet)..." @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ @@ -278,10 +300,23 @@ local-submitqueue-start: build-all-linux ## Start full stack (Gateway + Orchestr @echo "" @make local-submitqueue-ps +local-runway-orchestrator-start: build-runway-orchestrator-linux ## Start Runway orchestrator locally (orchestrator + MySQL queue) + @echo "Starting Runway orchestrator with compose..." + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait + @echo "Applying queue schema to mysql-queue..." + @$(MAKE) -s local-init-runway-queue-schema + @echo "" + @echo "Runway orchestrator is running!" + @echo "" + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps + @echo "" + @echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')" + local-stop: ## Stop all services (keep data) @echo "Stopping all services..." @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) down @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) down + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down @echo "Services stopped. Data volumes preserved." local-stovepipe-logs: ## View logs from all running Stovepipe services diff --git a/example/runway/orchestrator/server/BUILD.bazel b/example/runway/orchestrator/server/BUILD.bazel new file mode 100644 index 00000000..aa52001d --- /dev/null +++ b/example/runway/orchestrator/server/BUILD.bazel @@ -0,0 +1,30 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "server_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/runway/orchestrator/server", + visibility = ["//visibility:private"], + deps = [ + "//platform/consumer", + "//platform/errs", + "//platform/errs/generic", + "//platform/errs/mysql", + "//platform/extension/messagequeue", + "//platform/extension/messagequeue/mysql", + "//runway/core/topickey", + "//runway/extension/vcs", + "//runway/extension/vcs/noop", + "//runway/orchestrator/controller/merge", + "//runway/orchestrator/controller/mergeconflictcheck", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "orchestrator", + embed = [":server_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/runway/orchestrator/server/Dockerfile b/example/runway/orchestrator/server/Dockerfile new file mode 100644 index 00000000..4691dad2 --- /dev/null +++ b/example/runway/orchestrator/server/Dockerfile @@ -0,0 +1,9 @@ +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /root/ + +# Built via: make build-runway-orchestrator-linux +COPY .docker-bin/runway-orchestrator ./orchestrator + +CMD ["./orchestrator"] diff --git a/example/runway/orchestrator/server/docker-compose.yml b/example/runway/orchestrator/server/docker-compose.yml new file mode 100644 index 00000000..ba3288cb --- /dev/null +++ b/example/runway/orchestrator/server/docker-compose.yml @@ -0,0 +1,40 @@ +# Docker Compose for Runway orchestrator manual testing +# +# +# IMPORTANT: Before running compose, build the Linux binary: +# make build-runway-orchestrator-linux +# OR +# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator +# +# Quick start: +# make local-runway-orchestrator-start +# +# After `up`, only the queue schema is applied (`local-init-runway-queue-schema`). +# Runway is stateless — no application database is needed. + +services: + # Queue Database - Messaging infrastructure (messages, offsets, partition leases) + mysql-queue: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + + orchestrator-service: + build: + context: ${REPO_ROOT} + dockerfile: example/runway/orchestrator/server/Dockerfile + environment: + # Queue infrastructure connection (runway is stateless — no app DB) + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=orchestrator-dev + depends_on: + mysql-queue: + condition: service_healthy diff --git a/example/runway/orchestrator/server/main.go b/example/runway/orchestrator/server/main.go new file mode 100644 index 00000000..64840f6c --- /dev/null +++ b/example/runway/orchestrator/server/main.go @@ -0,0 +1,221 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + genericerrs "github.com/uber/submitqueue/platform/errs/generic" + mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" + extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/extension/vcs" + "github.com/uber/submitqueue/runway/extension/vcs/noop" + "github.com/uber/submitqueue/runway/orchestrator/controller/merge" + "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck" + "go.uber.org/zap" +) + +// noopVCSFactory adapts the noop VCS into the vcs.Factory interface. +type noopVCSFactory struct { + instance *noop.VCS +} + +func (f *noopVCSFactory) For(_ vcs.Config) (vcs.VCS, error) { + return f.instance, nil +} + +func main() { + code := 0 + if err := run(); err != nil { + if errors.Is(err, context.Canceled) { + fmt.Println("Runway orchestrator server stopped by signal") + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Runway orchestrator server failure: %v\n", err) + code = 1 + } + } + os.Exit(code) +} + +func run() error { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logger, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + defer logger.Sync() + + scope := tally.NewTestScope("runway_orchestrator", nil) + metricsStopCh := make(chan any, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) + go func() { + defer metricsWgDone.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } + } + }() + + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + logger.Info("initialized queue", zap.String("dsn", queueDSN)) + + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("runway-orchestrator-%d", time.Now().Unix()) + } + + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + vcsFactory := &noopVCSFactory{instance: noop.New()} + + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + checkController := mergeconflictcheck.NewController(mergeconflictcheck.Params{ + Logger: logger.Sugar(), + Scope: scope, + Registry: registry, + VCSFactory: vcsFactory, + TopicKey: topickey.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-merge-conflict-check", + }) + if err := primaryConsumer.Register(checkController); err != nil { + return fmt.Errorf("failed to register merge-conflict-check controller: %w", err) + } + + mergeController := merge.NewController(merge.Params{ + Logger: logger.Sugar(), + Scope: scope, + Registry: registry, + VCSFactory: vcsFactory, + TopicKey: topickey.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) + if err := primaryConsumer.Register(mergeController); err != nil { + return fmt.Errorf("failed to register merge controller: %w", err) + } + + logger.Info("controllers registered", zap.Int("primary", 2)) + + if err := primaryConsumer.Start(ctx); err != nil { + return fmt.Errorf("failed to start primary consumer: %w", err) + } + + fmt.Println("Runway orchestrator server is running (consumer-only, no gRPC)") + fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") + + <-ctx.Done() + fmt.Println("Shutting down runway orchestrator server due to interruption signal...") + + stopErr := primaryConsumer.Stop(30000) + if stopErr != nil { + return fmt.Errorf("failed to stop consumer: %w", stopErr) + } + + return ctx.Err() +} + +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + return consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: topickey.TopicKeyMergeConflictCheck, + Name: "merge-conflict-checker", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "runway-merge-conflict-check", + ), + }, + { + Key: topickey.TopicKeyMerge, + Name: "merger", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "runway-merge", + ), + }, + { + Key: topickey.TopicKeyMergeConflictCheckSignal, + Name: "merge-conflict-checker-signal", + Queue: q, + }, + { + Key: topickey.TopicKeyMergeSignal, + Name: "merger-signal", + Queue: q, + }, + }) +}