Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ STOVEPIPE_STACK_COMPOSE_FILE = example/stovepipe/docker-compose.yml
# Fixed project name for local manual testing (tests use unique random names)
STOVEPIPE_LOCAL_PROJECT = stovepipe

# Runway compose files
RUNWAY_ORCHESTRATOR_COMPOSE_FILE = example/runway/orchestrator/server/docker-compose.yml

# Fixed project name for local manual testing (tests use unique random names)
RUNWAY_LOCAL_PROJECT = runway

# yamlfmt version for YAML formatting (override with: make fmt YAMLFMT_VERSION=v0.16.0)
YAMLFMT_VERSION ?= v0.16.0

Expand Down Expand Up @@ -47,7 +53,7 @@ define assert_clean
fi
endef

.PHONY: build build-all-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help
.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-init-stovepipe-queue-schema local-init-submitqueue-schemas local-runway-orchestrator-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help


build: ## Build all services and examples
Expand All @@ -56,7 +62,7 @@ build: ## Build all services and examples
@echo "Build complete!"

# Build Linux binaries required for Docker containers
build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker
build-all-linux: build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker
@echo "All Linux binaries ready for Docker"

build-submitqueue-gateway-linux: ## Build Gateway Linux binary for Docker
Expand All @@ -75,6 +81,14 @@ build-submitqueue-orchestrator-linux: ## Build Orchestrator Linux binary for Doc
cp -f bazel-bin/example/submitqueue/orchestrator/server/orchestrator .docker-bin/orchestrator
@echo "Orchestrator Linux binary ready at .docker-bin/orchestrator"

build-runway-orchestrator-linux: ## Build Runway orchestrator Linux binary for Docker
@echo "Building Runway orchestrator Linux binary for Docker..."
@$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator
@mkdir -p .docker-bin
@cp -f bazel-bin/example/runway/orchestrator/server/orchestrator_/orchestrator .docker-bin/runway-orchestrator 2>/dev/null || \
cp -f bazel-bin/example/runway/orchestrator/server/orchestrator .docker-bin/runway-orchestrator
@echo "Runway orchestrator Linux binary ready at .docker-bin/runway-orchestrator"

build-stovepipe-gateway-linux: ## Build Stovepipe gateway Linux binary for Docker
@echo "Building Stovepipe gateway Linux binary for Docker..."
@$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/stovepipe/gateway/server:gateway
Expand Down Expand Up @@ -209,6 +223,14 @@ local-init-submitqueue-schemas: ## Manually apply all database schemas
done
@echo "✅ All schemas applied successfully"

local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Runway compose stacks
@echo "Applying queue schema to mysql-queue (Runway; stateless — no app schema)..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "Runway queue schema applied successfully"

local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for Stovepipe compose stacks
@echo "Applying queue schema to mysql-queue (Stovepipe; no app storage/counter schema yet)..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
Expand Down Expand Up @@ -278,10 +300,23 @@ local-submitqueue-start: build-all-linux ## Start full stack (Gateway + Orchestr
@echo ""
@make local-submitqueue-ps

local-runway-orchestrator-start: build-runway-orchestrator-linux ## Start Runway orchestrator locally (orchestrator + MySQL queue)
@echo "Starting Runway orchestrator with compose..."
@$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait
@echo "Applying queue schema to mysql-queue..."
@$(MAKE) -s local-init-runway-queue-schema
@echo ""
@echo "Runway orchestrator is running!"
@echo ""
@$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps
@echo ""
@echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')"

local-stop: ## Stop all services (keep data)
@echo "Stopping all services..."
@$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) down
@$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) down
@$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down
@echo "Services stopped. Data volumes preserved."

local-stovepipe-logs: ## View logs from all running Stovepipe services
Expand Down
30 changes: 30 additions & 0 deletions example/runway/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "server_lib",
srcs = ["main.go"],
importpath = "github.com/uber/submitqueue/example/runway/orchestrator/server",
visibility = ["//visibility:private"],
deps = [
"//platform/consumer",
"//platform/errs",
"//platform/errs/generic",
"//platform/errs/mysql",
"//platform/extension/messagequeue",
"//platform/extension/messagequeue/mysql",
"//runway/core/topickey",
"//runway/extension/vcs",
"//runway/extension/vcs/noop",
"//runway/orchestrator/controller/merge",
"//runway/orchestrator/controller/mergeconflictcheck",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)

go_binary(
name = "orchestrator",
embed = [":server_lib"],
visibility = ["//visibility:public"],
)
9 changes: 9 additions & 0 deletions example/runway/orchestrator/server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM debian:bookworm-slim

RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
WORKDIR /root/

# Built via: make build-runway-orchestrator-linux
COPY .docker-bin/runway-orchestrator ./orchestrator

CMD ["./orchestrator"]
40 changes: 40 additions & 0 deletions example/runway/orchestrator/server/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Docker Compose for Runway orchestrator manual testing
#
#
# IMPORTANT: Before running compose, build the Linux binary:
# make build-runway-orchestrator-linux
# OR
# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator
#
# Quick start:
# make local-runway-orchestrator-start
#
# After `up`, only the queue schema is applied (`local-init-runway-queue-schema`).
# Runway is stateless — no application database is needed.

services:
# Queue Database - Messaging infrastructure (messages, offsets, partition leases)
mysql-queue:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: submitqueue
ports:
- "3306" # Random ephemeral port to avoid conflicts
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"]
interval: 5s
timeout: 5s
retries: 10

orchestrator-service:
build:
context: ${REPO_ROOT}
dockerfile: example/runway/orchestrator/server/Dockerfile
environment:
# Queue infrastructure connection (runway is stateless — no app DB)
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
- HOSTNAME=orchestrator-dev
depends_on:
mysql-queue:
condition: service_healthy
221 changes: 221 additions & 0 deletions example/runway/orchestrator/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/errs"
genericerrs "github.com/uber/submitqueue/platform/errs/generic"
mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql"
extqueue "github.com/uber/submitqueue/platform/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql"
"github.com/uber/submitqueue/runway/core/topickey"
"github.com/uber/submitqueue/runway/extension/vcs"
"github.com/uber/submitqueue/runway/extension/vcs/noop"
"github.com/uber/submitqueue/runway/orchestrator/controller/merge"
"github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck"
"go.uber.org/zap"
)

// noopVCSFactory adapts the noop VCS into the vcs.Factory interface.
type noopVCSFactory struct {
instance *noop.VCS
}

func (f *noopVCSFactory) For(_ vcs.Config) (vcs.VCS, error) {
return f.instance, nil
}

func main() {
code := 0
if err := run(); err != nil {
if errors.Is(err, context.Canceled) {
fmt.Println("Runway orchestrator server stopped by signal")
code = 128 + int(syscall.SIGTERM)
} else {
fmt.Fprintf(os.Stderr, "Runway orchestrator server failure: %v\n", err)
code = 1
}
}
os.Exit(code)
}

func run() error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

logger, err := zap.NewDevelopment()
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}
defer logger.Sync()

scope := tally.NewTestScope("runway_orchestrator", nil)
metricsStopCh := make(chan any, 1)
metricsWgDone := sync.WaitGroup{}
metricsWgDone.Add(1)
go func() {
defer metricsWgDone.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-metricsStopCh:
return
case <-ticker.C:
snapshot := scope.Snapshot()
logger.Info("metrics snapshot",
zap.Any("counters", snapshot.Counters()),
zap.Any("gauges", snapshot.Gauges()),
zap.Any("timers", snapshot.Timers()),
)
}
}
}()

defer func() {
close(metricsStopCh)
metricsWgDone.Wait()
}()

queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
if queueDSN == "" {
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
}
queueDB, err := sql.Open("mysql", queueDSN)
if err != nil {
return fmt.Errorf("failed to open queue database: %w", err)
}
defer queueDB.Close()

mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer mysqlQueue.Close()

logger.Info("initialized queue", zap.String("dsn", queueDSN))

subscriberName := os.Getenv("HOSTNAME")
if subscriberName == "" {
subscriberName = fmt.Sprintf("runway-orchestrator-%d", time.Now().Unix())
}

registry, err := newTopicRegistry(mysqlQueue, subscriberName)
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

vcsFactory := &noopVCSFactory{instance: noop.New()}

primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

checkController := mergeconflictcheck.NewController(mergeconflictcheck.Params{
Logger: logger.Sugar(),
Scope: scope,
Registry: registry,
VCSFactory: vcsFactory,
TopicKey: topickey.TopicKeyMergeConflictCheck,
ConsumerGroup: "runway-merge-conflict-check",
})
if err := primaryConsumer.Register(checkController); err != nil {
return fmt.Errorf("failed to register merge-conflict-check controller: %w", err)
}

mergeController := merge.NewController(merge.Params{
Logger: logger.Sugar(),
Scope: scope,
Registry: registry,
VCSFactory: vcsFactory,
TopicKey: topickey.TopicKeyMerge,
ConsumerGroup: "runway-merge",
})
if err := primaryConsumer.Register(mergeController); err != nil {
return fmt.Errorf("failed to register merge controller: %w", err)
}

logger.Info("controllers registered", zap.Int("primary", 2))

if err := primaryConsumer.Start(ctx); err != nil {
return fmt.Errorf("failed to start primary consumer: %w", err)
}

fmt.Println("Runway orchestrator server is running (consumer-only, no gRPC)")
fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.")

<-ctx.Done()
fmt.Println("Shutting down runway orchestrator server due to interruption signal...")

stopErr := primaryConsumer.Stop(30000)
if stopErr != nil {
return fmt.Errorf("failed to stop consumer: %w", stopErr)
}

return ctx.Err()
}

func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: topickey.TopicKeyMergeConflictCheck,
Name: "merge-conflict-checker",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "runway-merge-conflict-check",
),
},
{
Key: topickey.TopicKeyMerge,
Name: "merger",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "runway-merge",
),
},
{
Key: topickey.TopicKeyMergeConflictCheckSignal,
Name: "merge-conflict-checker-signal",
Queue: q,
},
{
Key: topickey.TopicKeyMergeSignal,
Name: "merger-signal",
Queue: q,
},
})
}