diff --git a/BUILD.bazel b/BUILD.bazel index f217e569..2774adc9 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -9,6 +9,8 @@ load("@gazelle//:def.bzl", "gazelle") # Resolve protobuf import ambiguities - use the actual protopb packages, not the proto aliases # gazelle:resolve go github.com/uber/submitqueue/api/base/change/protopb //api/base/change/protopb # gazelle:resolve go github.com/uber/submitqueue/api/base/mergestrategy/protopb //api/base/mergestrategy/protopb +# gazelle:resolve go github.com/uber/submitqueue/api/runway/messagequeue/protopb //api/runway/messagequeue/protopb +# gazelle:resolve go github.com/uber/submitqueue/api/runway/orchestrator/protopb //api/runway/orchestrator/protopb # gazelle:resolve go github.com/uber/submitqueue/api/submitqueue/gateway/protopb //api/submitqueue/gateway/protopb # gazelle:resolve go github.com/uber/submitqueue/api/submitqueue/orchestrator/protopb //api/submitqueue/orchestrator/protopb # gazelle:resolve go github.com/uber/submitqueue/api/stovepipe/gateway/protopb //api/stovepipe/gateway/protopb diff --git a/Makefile b/Makefile index 1038e61d..c7719e81 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,12 @@ STOVEPIPE_STACK_COMPOSE_FILE = example/stovepipe/docker-compose.yml # Fixed project name for local manual testing (tests use unique random names) STOVEPIPE_LOCAL_PROJECT = stovepipe +# Runway compose files +RUNWAY_ORCHESTRATOR_COMPOSE_FILE = example/runway/orchestrator/server/docker-compose.yml + +# Fixed project name for local manual testing (tests use unique random names) +RUNWAY_LOCAL_PROJECT = runway + # yamlfmt version for YAML formatting (override with: make fmt YAMLFMT_VERSION=v0.16.0) YAMLFMT_VERSION ?= v0.16.0 @@ -31,7 +37,7 @@ GOIMPORTS_VERSION ?= v0.33.0 # (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A # package may hold multiple .proto files (e.g. an RPC contract plus messagequeue # contracts); all generated stubs land in the same protopb/ dir. -PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator +PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway/orchestrator api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) @@ -47,7 +53,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -56,9 +62,17 @@ build: ## Build all services and examples @echo "Build complete!" # Build Linux binaries required for Docker containers -build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker +build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux build-runway-orchestrator-linux ## Build all Linux binaries for Docker @echo "All Linux binaries ready for Docker" +build-runway-orchestrator-linux: ## Build Runway orchestrator Linux binary for Docker + @echo "Building Runway orchestrator Linux binary for Docker..." + @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator + @mkdir -p .docker-bin + @cp -f bazel-bin/example/runway/orchestrator/server/orchestrator_/orchestrator .docker-bin/runway-orchestrator 2>/dev/null || \ + cp -f bazel-bin/example/runway/orchestrator/server/orchestrator .docker-bin/runway-orchestrator + @echo "Runway orchestrator Linux binary ready at .docker-bin/runway-orchestrator" + build-submitqueue-gateway-linux: ## Build Gateway Linux binary for Docker @echo "Building Gateway Linux binary for Docker..." @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/submitqueue/gateway/server:gateway @@ -217,6 +231,32 @@ local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for done @echo "✅ Stovepipe queue schema applied successfully" +local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Runway compose stacks + @echo "Applying queue schema to mysql-queue (Runway; consumes the merge queues)..." + @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "✅ Runway queue schema applied successfully" + +local-runway-orchestrator-start: build-runway-orchestrator-linux ## Start Runway orchestrator locally (orchestrator + MySQL queue) + @echo "Starting Runway orchestrator with compose..." + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait + @echo "Applying queue schema to mysql-queue (no Runway app schema)..." + @$(MAKE) -s local-init-runway-queue-schema + @echo "" + @echo "✅ Runway orchestrator is running!" + @echo "" + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps + @echo "" + @echo "Runway orchestrator gRPC port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-orchestrator-service-1 8080 2>/dev/null | cut -d: -f2 || echo 'unknown')" + @echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')" + +local-runway-orchestrator-stop: ## Stop Runway orchestrator service + @echo "Stopping Runway orchestrator services..." + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down + @echo "Runway orchestrator services stopped." + local-submitqueue-logs: ## View logs from all running services @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) logs -f @@ -282,6 +322,7 @@ local-stop: ## Stop all services (keep data) @echo "Stopping all services..." @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) down @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) down + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down @echo "Services stopped. Data volumes preserved." local-stovepipe-logs: ## View logs from all running Stovepipe services @@ -378,6 +419,10 @@ run-client-stovepipe-gateway: run-client-stovepipe-orchestrator: @$(BAZEL) run //example/stovepipe/orchestrator/client:orchestrator -- -addr $(or $(SERVER_ADDR),localhost:8084) -message "$(or $(MESSAGE),ping)" +# Run runway orchestrator client (connects to any running runway orchestrator service) +run-client-runway-orchestrator: + @$(BAZEL) run //example/runway/orchestrator/client:orchestrator -- -addr $(or $(SERVER_ADDR),localhost:8086) -message "$(or $(MESSAGE),ping)" + run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics") @$(BAZEL) run //platform/extension/messagequeue/mysql/ctl -- $(ARGS) diff --git a/api/runway/messagequeue/BUILD.bazel b/api/runway/messagequeue/BUILD.bazel index 301a0fb9..2b790a2d 100644 --- a/api/runway/messagequeue/BUILD.bazel +++ b/api/runway/messagequeue/BUILD.bazel @@ -24,6 +24,7 @@ go_test( deps = [ "//api/base/change/protopb", "//api/base/mergestrategy/protopb", + "//api/runway/messagequeue/protopb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_google_protobuf//proto", diff --git a/api/runway/orchestrator/proto/BUILD.bazel b/api/runway/orchestrator/proto/BUILD.bazel new file mode 100644 index 00000000..a22fa9f4 --- /dev/null +++ b/api/runway/orchestrator/proto/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +exports_files( + ["orchestrator.proto"], + visibility = ["//tool/proto:__pkg__"], +) + +proto_library( + name = "orchestratorpb_proto", + srcs = ["orchestrator.proto"], + visibility = ["//visibility:public"], +) + +# keep +go_proto_library( + name = "orchestratorpb_go_proto", + compilers = [ + "@rules_go//proto:go_proto", + "@rules_go//proto:go_grpc_v2", + ], + importpath = "github.com/uber/submitqueue/api/runway/orchestrator/proto", + proto = ":orchestratorpb_proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "proto", + embed = [":orchestratorpb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/orchestrator/proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "protopb", + embed = [":orchestratorpb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/orchestrator/protopb", + visibility = ["//visibility:public"], +) diff --git a/api/runway/orchestrator/proto/orchestrator.proto b/api/runway/orchestrator/proto/orchestrator.proto new file mode 100644 index 00000000..28301b75 --- /dev/null +++ b/api/runway/orchestrator/proto/orchestrator.proto @@ -0,0 +1,46 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package uber.runway.orchestrator; + +option go_package = "github.com/uber/submitqueue/api/runway/orchestrator/protopb"; +option java_multiple_files = true; +option java_outer_classname = "OrchestratorProto"; +option java_package = "com.uber.submitqueue.runway.orchestrator"; + +// PingRequest is the request for the Ping method +message PingRequest { + // Optional message to include in the ping + string message = 1; +} + +// PingResponse is the response for the Ping method +message PingResponse { + // The response message + string message = 1; + // The service name that handled the request + string service_name = 2; + // Timestamp of when the ping was received + int64 timestamp = 3; + // Hostname of the server that handled the request + string hostname = 4; +} + +// RunwayOrchestrator provides the Runway orchestrator API. +service RunwayOrchestrator { + // Ping returns a response indicating the service is alive + rpc Ping(PingRequest) returns (PingResponse) {} +} diff --git a/api/runway/orchestrator/protopb/BUILD.bazel b/api/runway/orchestrator/protopb/BUILD.bazel new file mode 100644 index 00000000..b6e7b6d2 --- /dev/null +++ b/api/runway/orchestrator/protopb/BUILD.bazel @@ -0,0 +1,26 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "protopb", + srcs = [ + "orchestrator.pb.go", + "orchestrator.pb.yarpc.go", + "orchestrator_grpc.pb.go", + ], + importpath = "github.com/uber/submitqueue/api/runway/orchestrator/protopb", + visibility = ["//visibility:public"], + deps = [ + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_google_protobuf//proto", + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + "@org_uber_go_fx//:fx", + "@org_uber_go_yarpc//:yarpc", + "@org_uber_go_yarpc//api/transport", + "@org_uber_go_yarpc//api/x/restriction", + "@org_uber_go_yarpc//encoding/protobuf/reflection", + "@org_uber_go_yarpc//encoding/protobuf/v2:protobuf", + ], +) diff --git a/api/runway/orchestrator/protopb/orchestrator.pb.go b/api/runway/orchestrator/protopb/orchestrator.pb.go new file mode 100644 index 00000000..52492096 --- /dev/null +++ b/api/runway/orchestrator/protopb/orchestrator.pb.go @@ -0,0 +1,223 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: orchestrator.proto + +package protopb + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PingRequest is the request for the Ping method +type PingRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Optional message to include in the ping + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PingRequest) Reset() { + *x = PingRequest{} + mi := &file_orchestrator_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PingRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingRequest) ProtoMessage() {} + +func (x *PingRequest) ProtoReflect() protoreflect.Message { + mi := &file_orchestrator_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. +func (*PingRequest) Descriptor() ([]byte, []int) { + return file_orchestrator_proto_rawDescGZIP(), []int{0} +} + +func (x *PingRequest) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +// PingResponse is the response for the Ping method +type PingResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // The response message + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + // The service name that handled the request + ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + // Timestamp of when the ping was received + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Hostname of the server that handled the request + Hostname string `protobuf:"bytes,4,opt,name=hostname,proto3" json:"hostname,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PingResponse) Reset() { + *x = PingResponse{} + mi := &file_orchestrator_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PingResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResponse) ProtoMessage() {} + +func (x *PingResponse) ProtoReflect() protoreflect.Message { + mi := &file_orchestrator_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. +func (*PingResponse) Descriptor() ([]byte, []int) { + return file_orchestrator_proto_rawDescGZIP(), []int{1} +} + +func (x *PingResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *PingResponse) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *PingResponse) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *PingResponse) GetHostname() string { + if x != nil { + return x.Hostname + } + return "" +} + +var File_orchestrator_proto protoreflect.FileDescriptor + +const file_orchestrator_proto_rawDesc = "" + + "\n" + + "\x12orchestrator.proto\x12\x18uber.runway.orchestrator\"'\n" + + "\vPingRequest\x12\x18\n" + + "\amessage\x18\x01 \x01(\tR\amessage\"\x85\x01\n" + + "\fPingResponse\x12\x18\n" + + "\amessage\x18\x01 \x01(\tR\amessage\x12!\n" + + "\fservice_name\x18\x02 \x01(\tR\vserviceName\x12\x1c\n" + + "\ttimestamp\x18\x03 \x01(\x03R\ttimestamp\x12\x1a\n" + + "\bhostname\x18\x04 \x01(\tR\bhostname2m\n" + + "\x12RunwayOrchestrator\x12W\n" + + "\x04Ping\x12%.uber.runway.orchestrator.PingRequest\x1a&.uber.runway.orchestrator.PingResponse\"\x00B|\n" + + "(com.uber.submitqueue.runway.orchestratorB\x11OrchestratorProtoP\x01Z;github.com/uber/submitqueue/api/runway/orchestrator/protopbb\x06proto3" + +var ( + file_orchestrator_proto_rawDescOnce sync.Once + file_orchestrator_proto_rawDescData []byte +) + +func file_orchestrator_proto_rawDescGZIP() []byte { + file_orchestrator_proto_rawDescOnce.Do(func() { + file_orchestrator_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_orchestrator_proto_rawDesc), len(file_orchestrator_proto_rawDesc))) + }) + return file_orchestrator_proto_rawDescData +} + +var file_orchestrator_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_orchestrator_proto_goTypes = []any{ + (*PingRequest)(nil), // 0: uber.runway.orchestrator.PingRequest + (*PingResponse)(nil), // 1: uber.runway.orchestrator.PingResponse +} +var file_orchestrator_proto_depIdxs = []int32{ + 0, // 0: uber.runway.orchestrator.RunwayOrchestrator.Ping:input_type -> uber.runway.orchestrator.PingRequest + 1, // 1: uber.runway.orchestrator.RunwayOrchestrator.Ping:output_type -> uber.runway.orchestrator.PingResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_orchestrator_proto_init() } +func file_orchestrator_proto_init() { + if File_orchestrator_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_orchestrator_proto_rawDesc), len(file_orchestrator_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_orchestrator_proto_goTypes, + DependencyIndexes: file_orchestrator_proto_depIdxs, + MessageInfos: file_orchestrator_proto_msgTypes, + }.Build() + File_orchestrator_proto = out.File + file_orchestrator_proto_goTypes = nil + file_orchestrator_proto_depIdxs = nil +} diff --git a/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go b/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go new file mode 100644 index 00000000..f2174e13 --- /dev/null +++ b/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go @@ -0,0 +1,255 @@ +// Code generated by protoc-gen-yarpc-go. DO NOT EDIT. +// source: orchestrator.proto + +package protopb + +import ( + "context" + "io/ioutil" + "reflect" + + "go.uber.org/fx" + "go.uber.org/yarpc" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/api/x/restriction" + "go.uber.org/yarpc/encoding/protobuf/reflection" + v2 "go.uber.org/yarpc/encoding/protobuf/v2" + "google.golang.org/protobuf/proto" +) + +var _ = ioutil.NopCloser + +// RunwayOrchestratorYARPCClient is the YARPC client-side interface for the RunwayOrchestrator service. +type RunwayOrchestratorYARPCClient interface { + Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) +} + +func newRunwayOrchestratorYARPCClient(clientConfig transport.ClientConfig, anyResolver v2.AnyResolver, options ...v2.ClientOption) RunwayOrchestratorYARPCClient { + return &_RunwayOrchestratorYARPCCaller{v2.NewStreamClient( + v2.ClientParams{ + ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", + ClientConfig: clientConfig, + AnyResolver: anyResolver, + Options: options, + }, + )} +} + +// NewRunwayOrchestratorYARPCClient builds a new YARPC client for the RunwayOrchestrator service. +func NewRunwayOrchestratorYARPCClient(clientConfig transport.ClientConfig, options ...v2.ClientOption) RunwayOrchestratorYARPCClient { + return newRunwayOrchestratorYARPCClient(clientConfig, nil, options...) +} + +// RunwayOrchestratorYARPCServer is the YARPC server-side interface for the RunwayOrchestrator service. +type RunwayOrchestratorYARPCServer interface { + Ping(context.Context, *PingRequest) (*PingResponse, error) +} + +type buildRunwayOrchestratorYARPCProceduresParams struct { + Server RunwayOrchestratorYARPCServer + AnyResolver v2.AnyResolver +} + +func buildRunwayOrchestratorYARPCProcedures(params buildRunwayOrchestratorYARPCProceduresParams) []transport.Procedure { + handler := &_RunwayOrchestratorYARPCHandler{params.Server} + return v2.BuildProcedures( + v2.BuildProceduresParams{ + ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", + UnaryHandlerParams: []v2.BuildProceduresUnaryHandlerParams{ + { + MethodName: "Ping", + Handler: v2.NewUnaryHandler( + v2.UnaryHandlerParams{ + Handle: handler.Ping, + NewRequest: newRunwayOrchestratorServicePingYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, + }, + OnewayHandlerParams: []v2.BuildProceduresOnewayHandlerParams{}, + StreamHandlerParams: []v2.BuildProceduresStreamHandlerParams{}, + }, + ) +} + +// BuildRunwayOrchestratorYARPCProcedures prepares an implementation of the RunwayOrchestrator service for YARPC registration. +func BuildRunwayOrchestratorYARPCProcedures(server RunwayOrchestratorYARPCServer) []transport.Procedure { + return buildRunwayOrchestratorYARPCProcedures(buildRunwayOrchestratorYARPCProceduresParams{Server: server}) +} + +// FxRunwayOrchestratorYARPCClientParams defines the input +// for NewFxRunwayOrchestratorYARPCClient. It provides the +// paramaters to get a RunwayOrchestratorYARPCClient in an +// Fx application. +type FxRunwayOrchestratorYARPCClientParams struct { + fx.In + + Provider yarpc.ClientConfig + AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` + Restriction restriction.Checker `optional:"true"` +} + +// FxRunwayOrchestratorYARPCClientResult defines the output +// of NewFxRunwayOrchestratorYARPCClient. It provides a +// RunwayOrchestratorYARPCClient to an Fx application. +type FxRunwayOrchestratorYARPCClientResult struct { + fx.Out + + Client RunwayOrchestratorYARPCClient + + // We are using an fx.Out struct here instead of just returning a client + // so that we can add more values or add named versions of the client in + // the future without breaking any existing code. +} + +// NewFxRunwayOrchestratorYARPCClient provides a RunwayOrchestratorYARPCClient +// to an Fx application using the given name for routing. +// +// fx.Provide( +// protopb.NewFxRunwayOrchestratorYARPCClient("service-name"), +// ... +// ) +func NewFxRunwayOrchestratorYARPCClient(name string, options ...v2.ClientOption) interface{} { + return func(params FxRunwayOrchestratorYARPCClientParams) FxRunwayOrchestratorYARPCClientResult { + cc := params.Provider.ClientConfig(name) + + if params.Restriction != nil { + if namer, ok := cc.GetUnaryOutbound().(transport.Namer); ok { + if err := params.Restriction.Check(v2.Encoding, namer.TransportName()); err != nil { + panic(err.Error()) + } + } + } + + return FxRunwayOrchestratorYARPCClientResult{ + Client: newRunwayOrchestratorYARPCClient(cc, params.AnyResolver, options...), + } + } +} + +// FxRunwayOrchestratorYARPCProceduresParams defines the input +// for NewFxRunwayOrchestratorYARPCProcedures. It provides the +// paramaters to get RunwayOrchestratorYARPCServer procedures in an +// Fx application. +type FxRunwayOrchestratorYARPCProceduresParams struct { + fx.In + + Server RunwayOrchestratorYARPCServer + AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` +} + +// FxRunwayOrchestratorYARPCProceduresResult defines the output +// of NewFxRunwayOrchestratorYARPCProcedures. It provides +// RunwayOrchestratorYARPCServer procedures to an Fx application. +// +// The procedures are provided to the "yarpcfx" value group. +// Dig 1.2 or newer must be used for this feature to work. +type FxRunwayOrchestratorYARPCProceduresResult struct { + fx.Out + + Procedures []transport.Procedure `group:"yarpcfx"` + ReflectionMeta reflection.ServerMeta `group:"yarpcfx"` +} + +// NewFxRunwayOrchestratorYARPCProcedures provides RunwayOrchestratorYARPCServer procedures to an Fx application. +// It expects a RunwayOrchestratorYARPCServer to be present in the container. +// +// fx.Provide( +// protopb.NewFxRunwayOrchestratorYARPCProcedures(), +// ... +// ) +func NewFxRunwayOrchestratorYARPCProcedures() interface{} { + return func(params FxRunwayOrchestratorYARPCProceduresParams) FxRunwayOrchestratorYARPCProceduresResult { + return FxRunwayOrchestratorYARPCProceduresResult{ + Procedures: buildRunwayOrchestratorYARPCProcedures(buildRunwayOrchestratorYARPCProceduresParams{ + Server: params.Server, + AnyResolver: params.AnyResolver, + }), + ReflectionMeta: reflection.ServerMeta{ + ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", + FileDescriptors: yarpcFileDescriptorClosure96b6e6782baaa298, + }, + } + } +} + +type _RunwayOrchestratorYARPCCaller struct { + streamClient v2.StreamClient +} + +func (c *_RunwayOrchestratorYARPCCaller) Ping(ctx context.Context, request *PingRequest, options ...yarpc.CallOption) (*PingResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Ping", request, newRunwayOrchestratorServicePingYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*PingResponse) + if !ok { + return nil, v2.CastError(emptyRunwayOrchestratorServicePingYARPCResponse, responseMessage) + } + return response, err +} + +type _RunwayOrchestratorYARPCHandler struct { + server RunwayOrchestratorYARPCServer +} + +func (h *_RunwayOrchestratorYARPCHandler) Ping(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *PingRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*PingRequest) + if !ok { + return nil, v2.CastError(emptyRunwayOrchestratorServicePingYARPCRequest, requestMessage) + } + } + response, err := h.server.Ping(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + +func newRunwayOrchestratorServicePingYARPCRequest() proto.Message { + return &PingRequest{} +} + +func newRunwayOrchestratorServicePingYARPCResponse() proto.Message { + return &PingResponse{} +} + +var ( + emptyRunwayOrchestratorServicePingYARPCRequest = &PingRequest{} + emptyRunwayOrchestratorServicePingYARPCResponse = &PingResponse{} +) + +var yarpcFileDescriptorClosure96b6e6782baaa298 = [][]byte{ + // orchestrator.proto + []byte{ + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x3f, 0x4f, 0xc3, 0x30, + 0x10, 0xc5, 0x09, 0xad, 0x80, 0x5e, 0xbb, 0xe0, 0x29, 0xaa, 0x18, 0x4a, 0x24, 0x20, 0x93, 0x2d, + 0xc1, 0xc8, 0xd6, 0x0f, 0x00, 0x51, 0x16, 0x24, 0x16, 0xe4, 0x44, 0xa7, 0xc4, 0x83, 0x63, 0xd7, + 0x67, 0x83, 0x90, 0x58, 0xf9, 0xde, 0x28, 0x0e, 0x7f, 0x3c, 0x50, 0x75, 0xf3, 0xdd, 0xbd, 0xf7, + 0x74, 0x3f, 0x1f, 0x30, 0xe3, 0xda, 0x1e, 0xc9, 0x3b, 0xe9, 0x8d, 0xe3, 0xd6, 0x19, 0x6f, 0x58, + 0x1e, 0x1a, 0x74, 0xdc, 0x85, 0xe1, 0x4d, 0xbe, 0xf3, 0x74, 0x5e, 0xdc, 0xc0, 0xb2, 0x52, 0x43, + 0x57, 0xe3, 0x2e, 0x20, 0x79, 0x96, 0xc3, 0xa9, 0x46, 0x22, 0xd9, 0x61, 0x9e, 0x6d, 0xb2, 0x72, + 0x51, 0xff, 0x94, 0xc5, 0x67, 0x06, 0xab, 0x49, 0x49, 0xd6, 0x0c, 0x84, 0xfb, 0xa5, 0xec, 0x12, + 0x56, 0x84, 0xee, 0x55, 0xb5, 0xf8, 0x32, 0x48, 0x8d, 0xf9, 0x71, 0x1c, 0x2f, 0xbf, 0x7b, 0x0f, + 0x52, 0x23, 0xbb, 0x80, 0x85, 0x57, 0x1a, 0xc9, 0x4b, 0x6d, 0xf3, 0xd9, 0x26, 0x2b, 0x67, 0xf5, + 0x5f, 0x83, 0xad, 0xe1, 0xac, 0x37, 0xe4, 0xa3, 0x79, 0x1e, 0xcd, 0xbf, 0xf5, 0xad, 0x06, 0x56, + 0x47, 0x8e, 0xc7, 0x04, 0x83, 0x3d, 0xc1, 0x7c, 0x5c, 0x8e, 0x5d, 0xf1, 0x7d, 0xa4, 0x3c, 0xc1, + 0x5c, 0x5f, 0x1f, 0x92, 0x4d, 0x8c, 0xc5, 0xd1, 0xf6, 0x03, 0xca, 0xd6, 0xe8, 0x49, 0x4e, 0xa1, + 0xd1, 0xca, 0xef, 0x02, 0x06, 0xfc, 0xcf, 0xba, 0x3d, 0x4f, 0x57, 0xaa, 0xc6, 0x8f, 0xaf, 0xb2, + 0xe7, 0xfb, 0x4e, 0xf9, 0x3e, 0x34, 0xbc, 0x35, 0x5a, 0x8c, 0x29, 0x22, 0x49, 0x11, 0xd2, 0x2a, + 0x31, 0x25, 0x89, 0x34, 0x49, 0xc4, 0xab, 0xd9, 0xa6, 0x39, 0x89, 0x8f, 0xbb, 0xaf, 0x00, 0x00, + 0x00, 0xff, 0xff, 0x54, 0xc6, 0xb6, 0x89, 0xd4, 0x01, 0x00, 0x00, + }, +} + +func init() { + yarpc.RegisterClientBuilder( + func(clientConfig transport.ClientConfig, structField reflect.StructField) RunwayOrchestratorYARPCClient { + return NewRunwayOrchestratorYARPCClient(clientConfig, v2.ClientBuilderOptions(clientConfig, structField)...) + }, + ) +} diff --git a/api/runway/orchestrator/protopb/orchestrator_grpc.pb.go b/api/runway/orchestrator/protopb/orchestrator_grpc.pb.go new file mode 100644 index 00000000..bb6c5fe5 --- /dev/null +++ b/api/runway/orchestrator/protopb/orchestrator_grpc.pb.go @@ -0,0 +1,142 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: orchestrator.proto + +package protopb + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + RunwayOrchestrator_Ping_FullMethodName = "/uber.runway.orchestrator.RunwayOrchestrator/Ping" +) + +// RunwayOrchestratorClient is the client API for RunwayOrchestrator service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// RunwayOrchestrator provides the Runway orchestrator API. +type RunwayOrchestratorClient interface { + // Ping returns a response indicating the service is alive + Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) +} + +type runwayOrchestratorClient struct { + cc grpc.ClientConnInterface +} + +func NewRunwayOrchestratorClient(cc grpc.ClientConnInterface) RunwayOrchestratorClient { + return &runwayOrchestratorClient{cc} +} + +func (c *runwayOrchestratorClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(PingResponse) + err := c.cc.Invoke(ctx, RunwayOrchestrator_Ping_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RunwayOrchestratorServer is the server API for RunwayOrchestrator service. +// All implementations must embed UnimplementedRunwayOrchestratorServer +// for forward compatibility. +// +// RunwayOrchestrator provides the Runway orchestrator API. +type RunwayOrchestratorServer interface { + // Ping returns a response indicating the service is alive + Ping(context.Context, *PingRequest) (*PingResponse, error) + mustEmbedUnimplementedRunwayOrchestratorServer() +} + +// UnimplementedRunwayOrchestratorServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedRunwayOrchestratorServer struct{} + +func (UnimplementedRunwayOrchestratorServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedRunwayOrchestratorServer) mustEmbedUnimplementedRunwayOrchestratorServer() {} +func (UnimplementedRunwayOrchestratorServer) testEmbeddedByValue() {} + +// UnsafeRunwayOrchestratorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RunwayOrchestratorServer will +// result in compilation errors. +type UnsafeRunwayOrchestratorServer interface { + mustEmbedUnimplementedRunwayOrchestratorServer() +} + +func RegisterRunwayOrchestratorServer(s grpc.ServiceRegistrar, srv RunwayOrchestratorServer) { + // If the following call pancis, it indicates UnimplementedRunwayOrchestratorServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&RunwayOrchestrator_ServiceDesc, srv) +} + +func _RunwayOrchestrator_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RunwayOrchestratorServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: RunwayOrchestrator_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RunwayOrchestratorServer).Ping(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// RunwayOrchestrator_ServiceDesc is the grpc.ServiceDesc for RunwayOrchestrator service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var RunwayOrchestrator_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", + HandlerType: (*RunwayOrchestratorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _RunwayOrchestrator_Ping_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "orchestrator.proto", +} diff --git a/example/runway/orchestrator/client/BUILD.bazel b/example/runway/orchestrator/client/BUILD.bazel new file mode 100644 index 00000000..4322429c --- /dev/null +++ b/example/runway/orchestrator/client/BUILD.bazel @@ -0,0 +1,19 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "client_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/runway/orchestrator/client", + visibility = ["//visibility:private"], + deps = [ + "//api/runway/orchestrator/protopb", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials/insecure", + ], +) + +go_binary( + name = "client", + embed = [":client_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/runway/orchestrator/client/main.go b/example/runway/orchestrator/client/main.go new file mode 100644 index 00000000..730e642f --- /dev/null +++ b/example/runway/orchestrator/client/main.go @@ -0,0 +1,78 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "time" + + pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + addr := flag.String("addr", "localhost:8086", "orchestrator server address") + message := flag.String("message", "", "message to send in ping request") + timeout := flag.Duration("timeout", 5*time.Second, "request timeout") + flag.Parse() + + if err := run(*addr, *message, *timeout); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func run(addr, message string, timeout time.Duration) error { + // Create a gRPC connection + conn, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer conn.Close() + + // Create a client + client := pb.NewRunwayOrchestratorClient(conn) + + // Create context with timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Make the ping request + req := &pb.PingRequest{ + Message: message, + } + + fmt.Printf("Sending ping to orchestrator at %s...\n", addr) + resp, err := client.Ping(ctx, req) + if err != nil { + return fmt.Errorf("ping failed: %w", err) + } + + // Print the response + fmt.Printf("\nResponse:\n") + fmt.Printf(" Message: %s\n", resp.Message) + fmt.Printf(" Service Name: %s\n", resp.ServiceName) + fmt.Printf(" Timestamp: %d (%s)\n", resp.Timestamp, time.Unix(resp.Timestamp, 0)) + fmt.Printf(" Hostname: %s\n", resp.Hostname) + + return nil +} diff --git a/example/runway/orchestrator/server/BUILD.bazel b/example/runway/orchestrator/server/BUILD.bazel new file mode 100644 index 00000000..9f5f658f --- /dev/null +++ b/example/runway/orchestrator/server/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "server_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/runway/orchestrator/server", + visibility = ["//visibility:private"], + deps = [ + "//api/runway/messagequeue", + "//api/runway/orchestrator/protopb", + "//platform/consumer", + "//platform/errs", + "//platform/errs/generic", + "//platform/errs/mysql", + "//platform/extension/messagequeue", + "//platform/extension/messagequeue/mysql", + "//runway/orchestrator/controller", + "//runway/orchestrator/controller/merge", + "//runway/orchestrator/controller/mergeconflictcheck", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_uber_go_tally//:tally", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//reflection", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "server", + embed = [":server_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/runway/orchestrator/server/Dockerfile b/example/runway/orchestrator/server/Dockerfile new file mode 100644 index 00000000..dd982f9e --- /dev/null +++ b/example/runway/orchestrator/server/Dockerfile @@ -0,0 +1,11 @@ +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /root/ + +# Built via: make build-runway-orchestrator-linux +COPY .docker-bin/runway-orchestrator ./orchestrator + +EXPOSE 8080 + +CMD ["./orchestrator"] diff --git a/example/runway/orchestrator/server/docker-compose.yml b/example/runway/orchestrator/server/docker-compose.yml new file mode 100644 index 00000000..3864d95b --- /dev/null +++ b/example/runway/orchestrator/server/docker-compose.yml @@ -0,0 +1,46 @@ +# Docker Compose for Runway orchestrator manual testing +# +# +# IMPORTANT: Before running compose, build the Linux binary: +# make build-runway-orchestrator-linux +# OR +# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator +# +# Quick start: +# make local-runway-orchestrator-start +# +# After `up`, only the queue schema is applied (`local-init-runway-queue-schema`). + +services: + # Queue Database - Messaging infrastructure (messages, offsets, partition leases). + # Runway consumes the merge-conflict-check and merge queues from here. + mysql-queue: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + + orchestrator-service: + build: + context: ${REPO_ROOT} + dockerfile: example/runway/orchestrator/server/Dockerfile + ports: + - "8080" # Random ephemeral port to avoid conflicts + environment: + - PORT=:8080 + # Queue infrastructure connection + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=runway-orchestrator-dev + depends_on: + mysql-queue: + condition: service_healthy diff --git a/example/runway/orchestrator/server/main.go b/example/runway/orchestrator/server/main.go new file mode 100644 index 00000000..db3d0798 --- /dev/null +++ b/example/runway/orchestrator/server/main.go @@ -0,0 +1,261 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net" + "os" + "os/signal" + "sync" + "syscall" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + genericerrs "github.com/uber/submitqueue/platform/errs/generic" + mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" + extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" + "github.com/uber/submitqueue/runway/orchestrator/controller" + "github.com/uber/submitqueue/runway/orchestrator/controller/merge" + "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +// OrchestratorServer wraps the controller and implements the gRPC service interface. +type OrchestratorServer struct { + pb.UnimplementedRunwayOrchestratorServer + pingController *controller.PingController +} + +// Ping delegates to the controller. +func (s *OrchestratorServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { + return s.pingController.Ping(ctx, req) +} + +func main() { + code := 0 + if err := run(); err != nil { + if errors.Is(err, context.Canceled) { + fmt.Println("Runway orchestrator server stopped by signal") + + // Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM, + // but it will require a special processing not yet available in the standard library. + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Runway orchestrator server failure: %v\n", err) + // TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything + code = 1 + } + } + os.Exit(code) +} + +func run() error { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logger, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + defer logger.Sync() + + scope := tally.NewTestScope("runway_orchestrator", nil) + metricsStopCh := make(chan interface{}, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) + go func() { + defer metricsWgDone.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } + } + }() + + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + logger.Info("initialized queue", zap.String("dsn", queueDSN)) + + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("runway-orchestrator-%d", time.Now().Unix()) + } + + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{ + Logger: logger.Sugar(), + Scope: scope, + TopicKey: runwaymq.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-mergeconflictcheck", + }) + if err := primaryConsumer.Register(mergeConflictCheckController); err != nil { + return fmt.Errorf("failed to register merge-conflict-check controller: %w", err) + } + + mergeController := merge.NewController(merge.Params{ + Logger: logger.Sugar(), + Scope: scope, + TopicKey: runwaymq.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) + if err := primaryConsumer.Register(mergeController); err != nil { + return fmt.Errorf("failed to register merge controller: %w", err) + } + logger.Info("controllers registered", zap.Int("primary", 2)) + + if err := primaryConsumer.Start(ctx); err != nil { + return fmt.Errorf("failed to start primary consumer: %w", err) + } + logger.Info("consumer started") + + grpcServer := grpc.NewServer() + + pingController := controller.NewPingController(logger, scope) + srv := &OrchestratorServer{ + pingController: pingController, + } + pb.RegisterRunwayOrchestratorServer(grpcServer, srv) + + reflection.Register(grpcServer) + + port := os.Getenv("PORT") + if port == "" { + port = ":8086" + } + listener, err := net.Listen("tcp", port) + if err != nil { + return fmt.Errorf("failed to listen on port %s: %w", port, err) + } + + fmt.Printf("Runway orchestrator gRPC server is running on %s\n", port) + fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") + + serverErrCh := make(chan error, 1) + go func() { + serverErrCh <- grpcServer.Serve(listener) + }() + + var serverErr error + select { + case <-ctx.Done(): + fmt.Println("Shutting down runway orchestrator server due to interruption signal...") + + err = ctx.Err() + + grpcServer.GracefulStop() + serverErr = <-serverErrCh + case serverErr = <-serverErrCh: + fmt.Println("Shutting down runway orchestrator server due to critical GRPC server error...") + cancel() + } + + if serverErr != nil { + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + primaryStopErr := primaryConsumer.Stop(30000) + if primaryStopErr != nil { + primaryStopErr = fmt.Errorf("failed to stop consumer: %w", primaryStopErr) + } + + if primaryStopErr != nil || serverErr != nil { + err = errors.Join(primaryStopErr, serverErr) + } + + return err +} + +// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues. +// Runway is the consumer of the merge-conflict-check and merge queues; each is +// registered with a consuming subscription. The corresponding signal queues +// (where results are published) are not wired yet. +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + return consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: runwaymq.TopicKeyMergeConflictCheck, + Name: "merge-conflict-check", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "runway-mergeconflictcheck", + ), + }, + { + Key: runwaymq.TopicKeyMerge, + Name: "merge", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "runway-merge", + ), + }, + }) +} diff --git a/runway/README.md b/runway/README.md new file mode 100644 index 00000000..294c8585 --- /dev/null +++ b/runway/README.md @@ -0,0 +1,10 @@ +# Runway + +Runway owns the merge queues defined by the external contract in +[`api/runway/messagequeue`](../api/runway/messagequeue): it consumes merge-conflict-check and merge +requests, performs the work, and (eventually) publishes the result to the corresponding signal queue. +SubmitQueue is a client of these queues. + +Runway service layout: + +- `orchestrator/` — Orchestrator service: consumes the merge-conflict-check and merge queues. diff --git a/runway/orchestrator/README.md b/runway/orchestrator/README.md new file mode 100644 index 00000000..f85fb180 --- /dev/null +++ b/runway/orchestrator/README.md @@ -0,0 +1,9 @@ +# Runway Orchestrator + +Consumes Runway's merge queues (defined in [`api/runway/messagequeue`](../../api/runway/messagequeue)): + +- `merge-conflict-check` — dry-run check that an ordered sequence of merge steps applies cleanly, without committing. +- `merge` — committing merge: apply and commit the ordered steps. + +Both controllers currently deserialize the `MergeRequest` off the queue and log it; performing the +merge and publishing a `MergeResult` to the corresponding signal queue is not wired yet. diff --git a/runway/orchestrator/controller/BUILD.bazel b/runway/orchestrator/controller/BUILD.bazel new file mode 100644 index 00000000..4824beb7 --- /dev/null +++ b/runway/orchestrator/controller/BUILD.bazel @@ -0,0 +1,27 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "controller", + srcs = ["ping.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/orchestrator/protopb", + "//platform/metrics", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "controller_test", + srcs = ["ping_test.go"], + embed = [":controller"], + deps = [ + "//api/runway/orchestrator/protopb", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) diff --git a/runway/orchestrator/controller/merge/BUILD.bazel b/runway/orchestrator/controller/merge/BUILD.bazel new file mode 100644 index 00000000..96558ac2 --- /dev/null +++ b/runway/orchestrator/controller/merge/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "merge", + srcs = ["merge.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/merge", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/messagequeue", + "//platform/consumer", + "//platform/metrics", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "merge_test", + srcs = ["merge_test.go"], + embed = [":merge"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/extension/messagequeue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/merge/merge.go b/runway/orchestrator/controller/merge/merge.go new file mode 100644 index 00000000..9de51680 --- /dev/null +++ b/runway/orchestrator/controller/merge/merge.go @@ -0,0 +1,110 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package merge consumes committing merge requests from Runway's merge queue. A +// request asks Runway to apply an ordered sequence of merge steps onto the target +// branch and commit the result. +// +// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue +// and logs it. The real merge (apply and commit the steps, then publish a +// MergeResult with the produced revisions to the merge-signal queue) is not wired +// yet. +package merge + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "go.uber.org/zap" +) + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge queue messages. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge controller. +type Params struct { + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge controller for the orchestrator. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("merge_controller"), + metricsScope: p.Scope.SubScope("merge_controller"), + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +// Process deserializes the merge request and logs it. Returns nil to ack, or an +// error to nack. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + request := &runwaymq.MergeRequest{} + if err := runwaymq.Unmarshal(msg.Payload, request); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: a malformed payload will never deserialize on retry. + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + // TODO: apply and commit the ordered merge steps and publish a MergeResult + // with the produced revisions to the merge-signal queue. For now the request + // is only logged after parsing. + c.logger.Infow("received merge request", + "id", request.Id, + "queue_name", request.QueueName, + "step_count", len(request.Steps), + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "merge" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/runway/orchestrator/controller/merge/merge_test.go b/runway/orchestrator/controller/merge/merge_test.go new file mode 100644 index 00000000..c74135eb --- /dev/null +++ b/runway/orchestrator/controller/merge/merge_test.go @@ -0,0 +1,92 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +const ( + testID = "test-queue/1" + testQueue = "test-queue" + testPartitionKey = "test-queue" +) + +func newController(t *testing.T) *Controller { + t.Helper() + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + TopicKey: runwaymq.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) +} + +func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage(testID, payload, testPartitionKey, nil) + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { + t.Helper() + payload, err := runwaymq.Marshal(&req) + require.NoError(t, err) + return payload +} + +func TestNewController(t *testing.T) { + controller := newController(t) + require.NotNil(t, controller) + assert.Equal(t, runwaymq.TopicKeyMerge, controller.TopicKey()) + assert.Equal(t, "runway-merge", controller.ConsumerGroup()) + assert.Equal(t, "merge", controller.Name()) +} + +func TestProcess_LogsParsedRequest(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newController(t) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + require.NoError(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_DeserializeError(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newController(t) + + delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`)) + + require.Error(t, controller.Process(context.Background(), delivery)) +} diff --git a/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel new file mode 100644 index 00000000..3e810ab6 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflictcheck", + srcs = ["mergeconflictcheck.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/messagequeue", + "//platform/consumer", + "//platform/metrics", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflictcheck_test", + srcs = ["mergeconflictcheck_test.go"], + embed = [":mergeconflictcheck"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/extension/messagequeue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go new file mode 100644 index 00000000..73ec4736 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go @@ -0,0 +1,109 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mergeconflictcheck consumes dry-run merge-conflict check requests from +// Runway's merge-conflict-check queue. A request asks whether an ordered sequence +// of merge steps applies cleanly onto the target branch without committing. +// +// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue +// and logs it. The real check (attempt the merge without committing and publish a +// MergeResult to the merge-conflict-check-signal queue) is not wired yet. +package mergeconflictcheck + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "go.uber.org/zap" +) + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge-conflict-check queue messages. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge-conflict-check controller. +type Params struct { + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge-conflict-check controller for the orchestrator. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("mergeconflictcheck_controller"), + metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"), + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +// Process deserializes the merge request and logs it. Returns nil to ack, or an +// error to nack. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + request := &runwaymq.MergeRequest{} + if err := runwaymq.Unmarshal(msg.Payload, request); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: a malformed payload will never deserialize on retry. + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + // TODO: attempt the ordered merge steps without committing and publish a + // MergeResult to the merge-conflict-check-signal queue. For now the request + // is only logged after parsing. + c.logger.Infow("received merge-conflict-check request", + "id", request.Id, + "queue_name", request.QueueName, + "step_count", len(request.Steps), + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "merge-conflict-check" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go new file mode 100644 index 00000000..70fd41ad --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go @@ -0,0 +1,92 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictcheck + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +const ( + testID = "test-queue/1" + testQueue = "test-queue" + testPartitionKey = "test-queue" +) + +func newController(t *testing.T) *Controller { + t.Helper() + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + TopicKey: runwaymq.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-mergeconflictcheck", + }) +} + +func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage(testID, payload, testPartitionKey, nil) + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { + t.Helper() + payload, err := runwaymq.Marshal(&req) + require.NoError(t, err) + return payload +} + +func TestNewController(t *testing.T) { + controller := newController(t) + require.NotNil(t, controller) + assert.Equal(t, runwaymq.TopicKeyMergeConflictCheck, controller.TopicKey()) + assert.Equal(t, "runway-mergeconflictcheck", controller.ConsumerGroup()) + assert.Equal(t, "merge-conflict-check", controller.Name()) +} + +func TestProcess_LogsParsedRequest(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newController(t) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + require.NoError(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_DeserializeError(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newController(t) + + delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`)) + + require.Error(t, controller.Process(context.Background(), delivery)) +} diff --git a/runway/orchestrator/controller/ping.go b/runway/orchestrator/controller/ping.go new file mode 100644 index 00000000..8a913f19 --- /dev/null +++ b/runway/orchestrator/controller/ping.go @@ -0,0 +1,71 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "os" + "time" + + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + "github.com/uber/submitqueue/platform/metrics" + "go.uber.org/zap" +) + +// PingController handles ping business logic for the Runway orchestrator. +type PingController struct { + logger *zap.Logger + metricsScope tally.Scope +} + +// NewPingController creates a new instance of the Runway orchestrator ping controller. +func NewPingController(logger *zap.Logger, scope tally.Scope) *PingController { + return &PingController{ + logger: logger, + metricsScope: scope, + } +} + +// Ping handles the ping request and returns a response. +func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (resp *pb.PingResponse, retErr error) { + const opName = "ping" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + message := "pong!" + isEcho := false + if req.Message != "" { + message = "echo: " + req.Message + isEcho = true + metrics.NamedCounter(c.metricsScope, opName, "echo_requests", 1) + } + + hostname, _ := os.Hostname() + + c.logger.Info("ping request received", + zap.String("message", req.Message), + zap.Bool("is_echo", isEcho), + zap.String("hostname", hostname), + ) + + return &pb.PingResponse{ + Message: message, + ServiceName: "runway-orchestrator", + Timestamp: time.Now().Unix(), + Hostname: hostname, + }, nil +} diff --git a/runway/orchestrator/controller/ping_test.go b/runway/orchestrator/controller/ping_test.go new file mode 100644 index 00000000..e2950804 --- /dev/null +++ b/runway/orchestrator/controller/ping_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + "go.uber.org/zap" +) + +func TestNewPingController(t *testing.T) { + ctrl := NewPingController(zap.NewNop(), tally.NoopScope) + require.NotNil(t, ctrl) +} + +func TestPing_DefaultMessage(t *testing.T) { + ctrl := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + req := &pb.PingRequest{} + resp, err := ctrl.Ping(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "pong!", resp.Message) +} + +func TestPing_ServiceName(t *testing.T) { + ctrl := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + req := &pb.PingRequest{} + resp, err := ctrl.Ping(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "runway-orchestrator", resp.ServiceName) +} + +func TestPing_Timestamp(t *testing.T) { + ctrl := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + before := time.Now().Unix() + req := &pb.PingRequest{} + resp, err := ctrl.Ping(ctx, req) + after := time.Now().Unix() + + require.NoError(t, err) + assert.GreaterOrEqual(t, resp.Timestamp, before) + assert.LessOrEqual(t, resp.Timestamp, after) +} diff --git a/tool/proto/BUILD.bazel b/tool/proto/BUILD.bazel index 9567ce10..8ef56571 100644 --- a/tool/proto/BUILD.bazel +++ b/tool/proto/BUILD.bazel @@ -35,6 +35,12 @@ go_proto_generated_files( out_dir = "api_runway_messagequeue", ) +go_proto_generated_files( + name = "api_runway_orchestrator", + srcs = ["//api/runway/orchestrator/proto:orchestrator.proto"], + out_dir = "api_runway_orchestrator", +) + go_proto_generated_files( name = "api_submitqueue_gateway", srcs = ["//api/submitqueue/gateway/proto:gateway.proto"], @@ -71,6 +77,7 @@ filegroup( ":api_base_mergestrategy", ":api_base_messagequeue", ":api_runway_messagequeue", + ":api_runway_orchestrator", ":api_stovepipe_gateway", ":api_stovepipe_orchestrator", ":api_submitqueue_gateway",