From 6819b08cdcba2bfc797350612a5ab124712a9382 Mon Sep 17 00:00:00 2001 From: Pavel Khotulev Date: Fri, 20 Mar 2026 12:11:29 +0000 Subject: [PATCH] Introduce scheduler router --- docs/property-router-scheduler-plan.md | 408 ++++++++++++++++++ nativelink-config/src/cas_server.rs | 8 +- nativelink-config/src/schedulers.rs | 18 + nativelink-config/src/serde_utils.rs | 6 +- nativelink-scheduler/BUILD.bazel | 2 + .../src/api_worker_scheduler.rs | 78 ++-- .../src/cache_lookup_scheduler.rs | 2 +- .../src/default_scheduler_factory.rs | 33 ++ nativelink-scheduler/src/grpc_scheduler.rs | 2 +- nativelink-scheduler/src/lib.rs | 1 + .../src/memory_awaited_action_db.rs | 28 +- nativelink-scheduler/src/mock_scheduler.rs | 2 +- .../src/property_modifier_scheduler.rs | 2 +- .../src/property_router_scheduler.rs | 129 ++++++ nativelink-scheduler/src/simple_scheduler.rs | 55 +-- .../src/simple_scheduler_state_manager.rs | 49 ++- nativelink-scheduler/src/worker.rs | 6 +- .../tests/property_router_scheduler_test.rs | 288 +++++++++++++ nativelink-service/src/execution_server.rs | 2 +- nativelink-store/src/default_store_factory.rs | 5 +- nativelink-store/src/fast_slow_store.rs | 2 +- nativelink-store/src/lib.rs | 2 +- nativelink-store/src/metrics_store.rs | 22 +- nativelink-store/src/mongo_store.rs | 2 +- nativelink-util/src/metrics.rs | 24 +- .../src/operation_state_manager.rs | 2 +- nativelink-util/src/telemetry.rs | 15 +- nativelink-worker/src/local_worker.rs | 14 +- .../src/running_actions_manager.rs | 5 +- .../tests/utils/local_worker_test_utils.rs | 13 +- src/bin/nativelink.rs | 35 +- 31 files changed, 1092 insertions(+), 168 deletions(-) create mode 100644 docs/property-router-scheduler-plan.md create mode 100644 nativelink-scheduler/src/property_router_scheduler.rs create mode 100644 nativelink-scheduler/tests/property_router_scheduler_test.rs diff --git a/docs/property-router-scheduler-plan.md b/docs/property-router-scheduler-plan.md new file mode 100644 index 000000000..c99dae7bc --- /dev/null +++ b/docs/property-router-scheduler-plan.md @@ -0,0 +1,408 @@ +# Plan: PropertyRouterScheduler + +Routes incoming actions to different backend schedulers based on a +platform property value (e.g. `container-image`), so the client always +talks to one endpoint and knows nothing about the internal topology. + +## Architecture + +``` +Bazel Client + │ + │ ExecuteRequest + ▼ +Front NativeLink Process + ├── ExecutionServer + │ │ + │ │ add_action(action_info) + │ ▼ + │ PropertyRouterScheduler + │ │ + │ │ reads action_info.platform_properties["container-image"] + │ │ + │ ├── "compile" / "test-env" / "test-fat-env" + │ │ └── GrpcScheduler ──► Scheduler Process 1 + │ │ │ + │ │ Workers (compile, test) + │ │ + │ └── anything else (default) + │ └── GrpcScheduler ──► Scheduler Process 2 + │ │ + │ Workers (default) + │ + └── worker_api (not exposed on front process — managed by backend processes) +``` + +## Files Changed: 8 total (3 new, 5 modified) + +### New files + +| File | Description | +|------|-------------| +| `nativelink-scheduler/src/property_router_scheduler.rs` | Core implementation | +| `nativelink-scheduler/tests/property_router_scheduler_test.rs` | Unit tests | +| `docs/property-router-scheduler-plan.md` | This file | + +### Modified files + +| File | Change | +|------|--------| +| `nativelink-config/src/schedulers.rs` | Add `PropertyRouterSpec` struct and `SchedulerSpec::PropertyRouter` variant | +| `nativelink-scheduler/src/lib.rs` | Register `property_router_scheduler` module | +| `nativelink-scheduler/src/default_scheduler_factory.rs` | Add match arm for `PropertyRouter` | + +--- + +## Step 1 — Config + +**File:** `nativelink-config/src/schedulers.rs` + +Add after `PropertyModifierSpec`: + +```rust +/// Routes actions to different schedulers based on a platform property value. +/// Actions whose property value matches a key in `routes` go to that scheduler. +/// All other actions (missing property or unmatched value) go to `default_scheduler`. +#[derive(Deserialize, Serialize, Debug)] +#[serde(deny_unknown_fields)] +#[cfg_attr(feature = "dev-schema", derive(JsonSchema))] +pub struct PropertyRouterSpec { + /// The platform property key to match on (e.g. "container-image"). + #[serde(deserialize_with = "convert_string_with_shellexpand")] + pub property_name: String, + + /// Map of property value -> nested scheduler spec. + pub routes: HashMap, + + /// Scheduler to use when the property is absent or its value does not match any route. + pub default_scheduler: Box, +} +``` + +Add variant to `SchedulerSpec`: + +```rust +pub enum SchedulerSpec { + Simple(SimpleSpec), + Grpc(GrpcSpec), + CacheLookup(CacheLookupSpec), + PropertyModifier(PropertyModifierSpec), + PropertyRouter(PropertyRouterSpec), // <-- new +} +``` + +--- + +## Step 2 — Core Implementation + +**File:** `nativelink-scheduler/src/property_router_scheduler.rs` + +Follows the exact same pattern as `property_modifier_scheduler.rs`. + +### Struct + +```rust +#[derive(MetricsComponent)] +pub struct PropertyRouterScheduler { + property_name: String, + #[metric(group = "routes")] + routes: HashMap>, + #[metric(group = "default_scheduler")] + default_scheduler: Arc, +} + +impl PropertyRouterScheduler { + pub fn new( + property_name: &str, + routes: HashMap>, + default_scheduler: Arc, + ) -> Self { + Self { + property_name: property_name.to_string(), + routes, + default_scheduler, + } + } +} +``` + +### `add_action` — the core routing logic + +Reads the property value from `action_info.platform_properties` +(`HashMap`), looks it up in `routes`, falls back to +`default_scheduler`: + +```rust +async fn inner_add_action( + &self, + client_operation_id: OperationId, + action_info: Arc, +) -> Result, Error> { + let scheduler = action_info + .platform_properties + .get(&self.property_name) + .and_then(|value| self.routes.get(value)) + .unwrap_or(&self.default_scheduler); + + scheduler.add_action(client_operation_id, action_info).await +} +``` + +### `filter_operations` — fan-out to all schedulers + +The caller (e.g. `WaitExecution`) does not know which backend scheduler +holds the operation, so the router must query all of them and merge: + +```rust +async fn inner_filter_operations( + &self, + filter: OperationFilter, +) -> Result, Error> { + let mut streams = Vec::with_capacity(self.routes.len() + 1); + for scheduler in self.routes.values() { + streams.push(scheduler.filter_operations(filter.clone()).await?); + } + streams.push(self.default_scheduler.filter_operations(filter).await?); + Ok(Box::pin(futures::stream::select_all(streams))) +} +``` + +`OperationFilter` is already `Clone` (derives it at line 67 of +`nativelink-util/src/operation_state_manager.rs`). + +### `KnownPlatformPropertyProvider` — union of all nested schedulers + +```rust +async fn inner_get_known_properties( + &self, + instance_name: &str, +) -> Result, Error> { + let mut all_props = HashSet::new(); + for scheduler in self.routes.values() { + if let Some(p) = scheduler.as_known_platform_property_provider() { + for prop in p.get_known_properties(instance_name).await? { + all_props.insert(prop); + } + } + } + if let Some(p) = self.default_scheduler.as_known_platform_property_provider() { + for prop in p.get_known_properties(instance_name).await? { + all_props.insert(prop); + } + } + Ok(all_props.into_iter().collect()) +} +``` + +### Trait impls + +Implements `ClientStateManager`, `KnownPlatformPropertyProvider`, +`RootMetricsComponent`. Does **not** implement `WorkerScheduler` — the +router never manages workers directly. + +--- + +## Step 3 — Register Module + +**File:** `nativelink-scheduler/src/lib.rs` + +Add: + +```rust +pub mod property_router_scheduler; +``` + +--- + +## Step 4 — Factory + +**File:** `nativelink-scheduler/src/default_scheduler_factory.rs` + +Add import at the top: + +```rust +use crate::property_router_scheduler::PropertyRouterScheduler; +``` + +Add match arm in `inner_scheduler_factory` after `PropertyModifier`: + +```rust +SchedulerSpec::PropertyRouter(spec) => { + let mut routes = HashMap::with_capacity(spec.routes.len()); + for (value, nested_spec) in &spec.routes { + let (action_scheduler, _) = Box::pin(inner_scheduler_factory( + nested_spec, + store_manager, + maybe_origin_event_tx, + )) + .await + .err_tip(|| format!("In nested PropertyRouterScheduler route '{value}'"))?; + routes.insert( + value.clone(), + action_scheduler.err_tip(|| { + format!("Nested route '{value}' is not an action scheduler") + })?, + ); + } + let (default_action_scheduler, _) = Box::pin(inner_scheduler_factory( + &spec.default_scheduler, + store_manager, + maybe_origin_event_tx, + )) + .await + .err_tip(|| "In PropertyRouterScheduler default_scheduler")?; + let router = Arc::new(PropertyRouterScheduler::new( + &spec.property_name, + routes, + default_action_scheduler + .err_tip(|| "Default scheduler is not an action scheduler")?, + )); + (Some(router), None) +} +``` + +--- + +## Step 5 — Tests + +**File:** `nativelink-scheduler/tests/property_router_scheduler_test.rs` + +Uses `MockActionScheduler` — same pattern as `property_modifier_scheduler_test.rs`. + +### Test fixture + +```rust +struct TestContext { + compile_scheduler: Arc, + default_scheduler: Arc, + router: PropertyRouterScheduler, +} + +fn make_router() -> TestContext { + let compile_scheduler = Arc::new(MockActionScheduler::new()); + let default_scheduler = Arc::new(MockActionScheduler::new()); + let mut routes = HashMap::new(); + routes.insert( + "compile".to_string(), + compile_scheduler.clone() as Arc, + ); + let router = PropertyRouterScheduler::new( + "container-image", + routes, + default_scheduler.clone() as Arc, + ); + TestContext { compile_scheduler, default_scheduler, router } +} +``` + +### Tests + +| # | Name | Scenario | Expected | +|---|------|----------|----------| +| 1 | `routes_to_matching_scheduler` | `container-image=compile` | `compile_scheduler.expect_add_action` fires, `default_scheduler` idle | +| 2 | `routes_to_default_when_no_match` | `container-image=other` | `default_scheduler.expect_add_action` fires, `compile_scheduler` idle | +| 3 | `routes_to_default_when_property_missing` | No `container-image` key | `default_scheduler.expect_add_action` fires | +| 4 | `routes_multiple_values` | Two actions: `compile` then `other` | Each routed to correct scheduler | +| 5 | `filter_operations_fans_out_to_all` | `filter_operations` called | Both `compile_scheduler` and `default_scheduler` receive the same filter | +| 6 | `known_properties_unions_all_schedulers` | `get_known_properties` called | Returns union of props from both schedulers | +| 7 | `error_from_nested_scheduler_propagates` | `compile_scheduler` returns `Err` | Router propagates the error | + +### Example test (test #1) + +```rust +#[nativelink_test] +async fn routes_to_matching_scheduler() -> Result<(), Error> { + let ctx = make_router(); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info + .platform_properties + .insert("container-image".to_string(), "compile".to_string()); + let action_info = Arc::new(action_info); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = join!( + ctx.router.add_action(client_operation_id.clone(), action_info.clone()), + ctx.compile_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert_eq!( + Some(&"compile".to_string()), + received_action.platform_properties.get("container-image") + ); + Ok(()) +} +``` + +--- + +## Example Production Config + +```json5 +// scheduler.json5 (front process — one endpoint for all clients) +{ + stores: [ + { + name: "CAS_STORE", + grpc: { + instance_name: "main", + endpoints: [{ address: "grpc://cas-node:50051" }], + store_type: "cas", + }, + }, + ], + schedulers: [ + { + name: "MAIN_SCHEDULER", + property_router: { + property_name: "container-image", + routes: { + "compile": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } }, + "test-env": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } }, + "test-fat-env": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } }, + }, + default_scheduler: { grpc: { endpoint: { address: "grpc://sched-default:50052" } } }, + }, + }, + ], + servers: [ + { + listener: { http: { socket_address: "0.0.0.0:50052" } }, + services: { + execution: [ + { instance_name: "", cas_store: "CAS_STORE", scheduler: "MAIN_SCHEDULER" }, + { instance_name: "main", cas_store: "CAS_STORE", scheduler: "MAIN_SCHEDULER" }, + ], + capabilities: [ + { instance_name: "", remote_execution: { scheduler: "MAIN_SCHEDULER" } }, + { instance_name: "main", remote_execution: { scheduler: "MAIN_SCHEDULER" } }, + ], + health: {}, + }, + }, + ], +} +``` + +--- + +## Notes + +- `WorkerScheduler` is **not** implemented by the router — worker management + stays entirely in the backend scheduler processes. +- The router does not cache the routing decision. This is intentional: + `add_action` reads a `HashMap` lookup — O(1), zero cost. +- `filter_operations` fan-out is necessary because `WaitExecution` uses it + and does not know which backend scheduler owns the operation. + With N backend schedulers this is N parallel gRPC calls — acceptable since + it's used for status polling, not hot-path action dispatch. diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index c7f9f4882..f7d2e5555 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -22,9 +22,9 @@ use serde::{Deserialize, Serialize}; use crate::schedulers::SchedulerSpec; use crate::serde_utils::{ convert_data_size_with_shellexpand, convert_duration_with_shellexpand, - convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand, - convert_optional_string_with_shellexpand, convert_string_with_shellexpand, - convert_vec_string_with_shellexpand, convert_enum_with_shellexpand, + convert_enum_with_shellexpand, convert_numeric_with_shellexpand, + convert_optional_numeric_with_shellexpand, convert_optional_string_with_shellexpand, + convert_string_with_shellexpand, convert_vec_string_with_shellexpand, }; use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName, StoreSpec}; @@ -854,7 +854,7 @@ pub struct LocalWorkerConfig { /// Default: None (directory cache disabled) pub directory_cache: Option, - #[serde(deserialize_with = "convert_enum_with_shellexpand")] + #[serde(default, deserialize_with = "convert_enum_with_shellexpand")] pub execution_completion_behaviour: ExecutionCompletionBehaviour, } diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 1dc4723bd..c2001817e 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -32,6 +32,7 @@ pub enum SchedulerSpec { Grpc(GrpcSpec), CacheLookup(CacheLookupSpec), PropertyModifier(PropertyModifierSpec), + PropertyRouter(PropertyRouterSpec), } /// When the scheduler matches tasks to workers that are capable of running @@ -323,3 +324,20 @@ pub struct PropertyModifierSpec { /// The nested scheduler to use after modifying the properties. pub scheduler: Box, } + +/// Routes actions to different schedulers based on a platform property value. +/// Actions whose property value matches a key in `routes` go to that scheduler. +/// All other actions (missing property or unmatched value) go to `default_scheduler`. +#[derive(Deserialize, Serialize, Debug)] +#[serde(deny_unknown_fields)] +#[cfg_attr(feature = "dev-schema", derive(JsonSchema))] +pub struct PropertyRouterSpec { + /// The platform property key to match on (e.g. "container-image"). + pub property_name: String, + + /// Map of property value -> nested scheduler spec. + pub routes: HashMap, + + /// Scheduler to use when the property is absent or its value does not match any route. + pub default_scheduler: Box, +} diff --git a/nativelink-config/src/serde_utils.rs b/nativelink-config/src/serde_utils.rs index 5ebdf0e11..375b3272e 100644 --- a/nativelink-config/src/serde_utils.rs +++ b/nativelink-config/src/serde_utils.rs @@ -486,10 +486,8 @@ where T: DeserializeOwned, { let s = String::deserialize(deserializer)?; - let expanded = shellexpand::env(&s) - .map_err(de::Error::custom)?; + let expanded = shellexpand::env(&s).map_err(de::Error::custom)?; let quoted = format!("\"{}\"", expanded); - serde_json5::from_str("ed) - .map_err(de::Error::custom) + serde_json5::from_str("ed).map_err(de::Error::custom) } diff --git a/nativelink-scheduler/BUILD.bazel b/nativelink-scheduler/BUILD.bazel index 74133d42b..a0653adb3 100644 --- a/nativelink-scheduler/BUILD.bazel +++ b/nativelink-scheduler/BUILD.bazel @@ -21,6 +21,7 @@ rust_library( "src/mock_scheduler.rs", "src/platform_property_manager.rs", "src/property_modifier_scheduler.rs", + "src/property_router_scheduler.rs", "src/simple_scheduler.rs", "src/simple_scheduler_state_manager.rs", "src/store_awaited_action_db.rs", @@ -66,6 +67,7 @@ rust_test_suite( "tests/action_messages_test.rs", "tests/cache_lookup_scheduler_test.rs", "tests/property_modifier_scheduler_test.rs", + "tests/property_router_scheduler_test.rs", "tests/redis_store_awaited_action_db_test.rs", "tests/simple_scheduler_state_manager_test.rs", "tests/simple_scheduler_test.rs", diff --git a/nativelink-scheduler/src/api_worker_scheduler.rs b/nativelink-scheduler/src/api_worker_scheduler.rs index 665467f4e..22647a0a9 100644 --- a/nativelink-scheduler/src/api_worker_scheduler.rs +++ b/nativelink-scheduler/src/api_worker_scheduler.rs @@ -22,13 +22,13 @@ use std::time::{Instant, UNIX_EPOCH}; use async_lock::RwLock; use lru::LruCache; use nativelink_config::schedulers::WorkerAllocationStrategy; -use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; +use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err}; use nativelink_metric::{ - group, MetricFieldData, MetricKind, MetricPublishKnownKindData, - MetricsComponent, RootMetricsComponent, + MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, + RootMetricsComponent, group, }; use nativelink_util::action_messages::{OperationId, WorkerId}; -use nativelink_util::metrics::{WORKER_POOL_METRICS, WORKER_POOL_INSTANCE, WorkerPoolMetricAttrs}; +use nativelink_util::metrics::{WORKER_POOL_INSTANCE, WORKER_POOL_METRICS, WorkerPoolMetricAttrs}; use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager}; use nativelink_util::platform_properties::PlatformProperties; use nativelink_util::shutdown_guard::ShutdownGuard; @@ -63,7 +63,10 @@ pub struct SchedulerMetrics { } use crate::platform_property_manager::PlatformPropertyManager; -use crate::worker::{reduce_platform_properties, Worker, ActionInfoWithProps, WorkerState, WorkerTimestamp, WorkerUpdate}; +use crate::worker::{ + ActionInfoWithProps, Worker, WorkerState, WorkerTimestamp, WorkerUpdate, + reduce_platform_properties, +}; use crate::worker_capability_index::WorkerCapabilityIndex; use crate::worker_registry::SharedWorkerRegistry; use crate::worker_scheduler::WorkerScheduler; @@ -96,11 +99,15 @@ impl WorkerSchedulerMetrics { } pub fn record_worker_removed(&self) { - WORKER_POOL_METRICS.worker_events.add(1, self.attrs.removed()); + WORKER_POOL_METRICS + .worker_events + .add(1, self.attrs.removed()); } pub fn record_worker_timeout(&self) { - WORKER_POOL_METRICS.worker_events.add(1, self.attrs.timeout()); + WORKER_POOL_METRICS + .worker_events + .add(1, self.attrs.timeout()); } pub fn record_worker_connection_failed(&self) { @@ -385,7 +392,7 @@ impl ApiWorkerSchedulerImpl { /// Batch finds workers for multiple actions in a single pass. /// This reduces lock contention by acquiring the lock once for all actions. - /// Returns a map of (action_index, worker_id) pairs for successful matches. + /// Returns a map of (`action_index`, `worker_id`) pairs for successful matches. fn inner_batch_find_workers_for_actions( &self, actions: &[&PlatformProperties], @@ -409,16 +416,23 @@ impl ApiWorkerSchedulerImpl { } if !workers_platform_properties.contains_key(&worker_id) { - workers_platform_properties.insert(worker_id.clone(), worker.platform_properties.clone()); + workers_platform_properties + .insert(worker_id.clone(), worker.platform_properties.clone()); } - if !platform_properties.is_satisfied_by(&workers_platform_properties[&worker_id], full_worker_logging) { + if !platform_properties.is_satisfied_by( + &workers_platform_properties[&worker_id], + full_worker_logging, + ) { continue; } - reduce_platform_properties(workers_platform_properties.get_mut(&worker_id).unwrap(), platform_properties); + reduce_platform_properties( + workers_platform_properties.get_mut(&worker_id).unwrap(), + platform_properties, + ); - results.insert(idx, worker_id.clone()); + results.insert(idx, worker_id.clone()); break; } } @@ -660,7 +674,10 @@ impl ApiWorkerSchedulerImpl { } fn count_running_actions(&self) -> usize { - self.workers.iter().map(|(_, w)| w.running_action_infos.len()).sum() + self.workers + .iter() + .map(|(_, w)| w.running_action_infos.len()) + .sum() } } @@ -720,7 +737,7 @@ impl ApiWorkerScheduler { /// Returns a reference to the worker scheduler metrics for recording OTEL metrics. #[must_use] - pub fn workerMetrics(&self) -> &WorkerSchedulerMetrics { + pub const fn workerMetrics(&self) -> &WorkerSchedulerMetrics { &self.worker_scheduler_metrics } @@ -744,7 +761,8 @@ impl ApiWorkerScheduler { } else { self.worker_scheduler_metrics.record_dispatch_failure(); } - self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions()); + self.worker_scheduler_metrics + .record_running_actions_count(inner.count_running_actions()); result } @@ -764,7 +782,9 @@ impl ApiWorkerScheduler { .fetch_add(count as u64, Ordering::Relaxed); let mut inner = self.inner.write().await; - let results = inner.inner_batch_worker_notify_run_action(assignments).await; + let results = inner + .inner_batch_worker_notify_run_action(assignments) + .await; // Record metrics let successes = results.iter().filter(|r| r.is_ok()).count(); @@ -832,7 +852,7 @@ impl ApiWorkerScheduler { /// This reduces lock contention compared to calling `find_worker_for_action` /// for each action individually. /// - /// Returns a vector of (action_index, worker_id) pairs for successful matches. + /// Returns a vector of (`action_index`, `worker_id`) pairs for successful matches. /// Actions that couldn't be matched to a worker are not included in the result. pub async fn batch_find_workers_for_actions( &self, @@ -846,8 +866,7 @@ impl ApiWorkerScheduler { let inner = self.inner.read().await; let worker_count = inner.workers.len() as u64; - let results = - inner.inner_batch_find_workers_for_actions(actions, full_worker_logging); + let results = inner.inner_batch_find_workers_for_actions(actions, full_worker_logging); // Track metrics self.metrics @@ -917,7 +936,8 @@ impl WorkerScheduler for ApiWorkerScheduler { .add_worker(worker) .err_tip(|| "Error while adding worker, removing from pool"); if let Err(err) = &result { - self.worker_scheduler_metrics.record_worker_connection_failed(); + self.worker_scheduler_metrics + .record_worker_connection_failed(); return Result::<(), _>::Err(err.clone()).merge( inner .immediate_evict_worker(&worker_id, err.clone(), false) @@ -930,7 +950,8 @@ impl WorkerScheduler for ApiWorkerScheduler { self.metrics.workers_added.fetch_add(1, Ordering::Relaxed); self.worker_scheduler_metrics.record_worker_added(); - self.worker_scheduler_metrics.record_worker_count(inner.workers.len()); + self.worker_scheduler_metrics + .record_worker_count(inner.workers.len()); Ok(()) } @@ -955,7 +976,8 @@ impl WorkerScheduler for ApiWorkerScheduler { if result.is_ok() && is_completion { self.worker_scheduler_metrics.record_action_completed(); } - self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions()); + self.worker_scheduler_metrics + .record_running_actions_count(inner.count_running_actions()); result } @@ -992,7 +1014,8 @@ impl WorkerScheduler for ApiWorkerScheduler { // Record worker removal self.worker_scheduler_metrics.record_worker_removed(); - self.worker_scheduler_metrics.record_worker_count(inner.workers.len()); + self.worker_scheduler_metrics + .record_worker_count(inner.workers.len()); result } @@ -1086,8 +1109,10 @@ impl WorkerScheduler for ApiWorkerScheduler { self.worker_scheduler_metrics.record_worker_timeout(); } - self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions()); - self.worker_scheduler_metrics.record_worker_count(inner.workers.len()); + self.worker_scheduler_metrics + .record_running_actions_count(inner.count_running_actions()); + self.worker_scheduler_metrics + .record_worker_count(inner.workers.len()); result } @@ -1095,7 +1120,8 @@ impl WorkerScheduler for ApiWorkerScheduler { async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> { let mut inner = self.inner.write().await; inner.set_drain_worker(worker_id, is_draining).await?; - self.worker_scheduler_metrics.record_worker_count(inner.workers.len()); + self.worker_scheduler_metrics + .record_worker_count(inner.workers.len()); Ok(()) } } diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index c11321771..7f8816634 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -382,7 +382,7 @@ impl ClientStateManager for CacheLookupScheduler { self.action_scheduler.as_known_platform_property_provider() } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index 711e34f67..2228bf9ee 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -32,6 +32,7 @@ use crate::cache_lookup_scheduler::CacheLookupScheduler; use crate::grpc_scheduler::GrpcScheduler; use crate::memory_awaited_action_db::MemoryAwaitedActionDb; use crate::property_modifier_scheduler::PropertyModifierScheduler; +use crate::property_router_scheduler::PropertyRouterScheduler; use crate::simple_scheduler::SimpleScheduler; use crate::store_awaited_action_db::StoreAwaitedActionDb; use crate::worker_scheduler::WorkerScheduler; @@ -95,6 +96,38 @@ async fn inner_scheduler_factory( )); (Some(property_modifier_scheduler), worker_scheduler) } + SchedulerSpec::PropertyRouter(spec) => { + use std::collections::HashMap; + let mut routes = HashMap::with_capacity(spec.routes.len()); + for (value, nested_spec) in &spec.routes { + let (action_scheduler, _) = Box::pin(inner_scheduler_factory( + nested_spec, + store_manager, + maybe_origin_event_tx, + )) + .await + .err_tip(|| format!("In nested PropertyRouterScheduler route '{value}'"))?; + routes.insert( + value.clone(), + action_scheduler + .err_tip(|| format!("Nested route '{value}' is not an action scheduler"))?, + ); + } + let (default_action_scheduler, _) = Box::pin(inner_scheduler_factory( + &spec.default_scheduler, + store_manager, + maybe_origin_event_tx, + )) + .await + .err_tip(|| "In PropertyRouterScheduler default_scheduler")?; + let router = Arc::new(PropertyRouterScheduler::new( + &spec.property_name, + routes, + default_action_scheduler + .err_tip(|| "Default scheduler is not an action scheduler")?, + )); + (Some(router), None) + } }; Ok(scheduler) diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index f4de0b0d3..65de39dce 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -355,7 +355,7 @@ impl ClientStateManager for GrpcScheduler { Some(self) } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/lib.rs b/nativelink-scheduler/src/lib.rs index b5d38cb13..cc11ffe27 100644 --- a/nativelink-scheduler/src/lib.rs +++ b/nativelink-scheduler/src/lib.rs @@ -21,6 +21,7 @@ pub mod memory_awaited_action_db; pub mod mock_scheduler; pub mod platform_property_manager; pub mod property_modifier_scheduler; +pub mod property_router_scheduler; pub mod simple_scheduler; pub mod simple_scheduler_state_manager; pub mod store_awaited_action_db; diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index faae5f8e5..024eb1ca2 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -680,24 +680,20 @@ impl I + Clone + Send + Sync> AwaitedActionDbI // Record completion metrics with action digest for failure tracking let action_digest = old_awaited_action.action_info().digest().to_string(); if let ActionStage::Completed(action_result) = new_stage { - let result_attrs = vec![ - opentelemetry::KeyValue::new( - nativelink_util::metrics::EXECUTION_RESULT, - if action_result.exit_code == 0 { - ExecutionResult::Success - } else { - ExecutionResult::Failure - }, - ), - ]; + let result_attrs = vec![opentelemetry::KeyValue::new( + nativelink_util::metrics::EXECUTION_RESULT, + if action_result.exit_code == 0 { + ExecutionResult::Success + } else { + ExecutionResult::Failure + }, + )]; metrics.execution_completed_count.add(1, &result_attrs); } else if let ActionStage::CompletedFromCache(_) = new_stage { - let result_attrs = vec![ - opentelemetry::KeyValue::new( - nativelink_util::metrics::EXECUTION_RESULT, - ExecutionResult::CacheHit, - ), - ]; + let result_attrs = vec![opentelemetry::KeyValue::new( + nativelink_util::metrics::EXECUTION_RESULT, + ExecutionResult::CacheHit, + )]; metrics.execution_completed_count.add(1, &result_attrs); } diff --git a/nativelink-scheduler/src/mock_scheduler.rs b/nativelink-scheduler/src/mock_scheduler.rs index ff9ab9f6d..f301a4553 100644 --- a/nativelink-scheduler/src/mock_scheduler.rs +++ b/nativelink-scheduler/src/mock_scheduler.rs @@ -193,7 +193,7 @@ impl ClientStateManager for MockActionScheduler { Some(self) } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/property_modifier_scheduler.rs b/nativelink-scheduler/src/property_modifier_scheduler.rs index 5343ecb0a..b273e4836 100644 --- a/nativelink-scheduler/src/property_modifier_scheduler.rs +++ b/nativelink-scheduler/src/property_modifier_scheduler.rs @@ -169,7 +169,7 @@ impl ClientStateManager for PropertyModifierScheduler { Some(self) } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/property_router_scheduler.rs b/nativelink-scheduler/src/property_router_scheduler.rs new file mode 100644 index 000000000..ba56f1142 --- /dev/null +++ b/nativelink-scheduler/src/property_router_scheduler.rs @@ -0,0 +1,129 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use nativelink_error::{Error, ResultExt}; +use nativelink_metric::{MetricsComponent, RootMetricsComponent}; +use nativelink_util::action_messages::{ActionInfo, OperationId}; +use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::operation_state_manager::{ + ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, +}; + +#[derive(MetricsComponent)] +pub struct PropertyRouterScheduler { + property_name: String, + #[metric(group = "routes")] + routes: HashMap>, + #[metric(group = "default_scheduler")] + default_scheduler: Arc, +} + +impl core::fmt::Debug for PropertyRouterScheduler { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("PropertyRouterScheduler") + .field("property_name", &self.property_name) + .finish_non_exhaustive() + } +} + +impl PropertyRouterScheduler { + pub fn new( + property_name: &str, + routes: HashMap>, + default_scheduler: Arc, + ) -> Self { + Self { + property_name: property_name.to_string(), + routes, + default_scheduler, + } + } + + async fn inner_add_action( + &self, + client_operation_id: OperationId, + action_info: Arc, + ) -> Result, Error> { + let scheduler = action_info + .platform_properties + .get(&self.property_name) + .and_then(|value| self.routes.get(value)) + .unwrap_or(&self.default_scheduler); + + scheduler.add_action(client_operation_id, action_info).await + } + + async fn inner_filter_operations( + &self, + filter: OperationFilter, + ) -> Result, Error> { + let mut streams = Vec::with_capacity(self.routes.len() + 1); + for scheduler in self.routes.values() { + streams.push(scheduler.filter_operations(filter.clone()).await?); + } + streams.push(self.default_scheduler.filter_operations(filter).await?); + Ok(Box::pin(futures::stream::select_all(streams))) + } + + async fn inner_get_known_properties(&self, instance_name: &str) -> Result, Error> { + let mut all_props = HashSet::new(); + for scheduler in self.routes.values() { + if let Some(p) = scheduler.as_known_platform_property_provider() { + for prop in p + .get_known_properties(instance_name) + .await + .err_tip(|| "In PropertyRouterScheduler::get_known_properties for route")? + { + all_props.insert(prop); + } + } + } + if let Some(p) = self.default_scheduler.as_known_platform_property_provider() { + for prop in p + .get_known_properties(instance_name) + .await + .err_tip(|| "In PropertyRouterScheduler::get_known_properties for default")? + { + all_props.insert(prop); + } + } + Ok(all_props.into_iter().collect()) + } +} + +#[async_trait] +impl KnownPlatformPropertyProvider for PropertyRouterScheduler { + async fn get_known_properties(&self, instance_name: &str) -> Result, Error> { + self.inner_get_known_properties(instance_name).await + } +} + +#[async_trait] +impl ClientStateManager for PropertyRouterScheduler { + async fn add_action( + &self, + client_operation_id: OperationId, + action_info: Arc, + ) -> Result, Error> { + self.inner_add_action(client_operation_id, action_info) + .await + } + + async fn filter_operations<'a>( + &'a self, + filter: OperationFilter, + ) -> Result, Error> { + self.inner_filter_operations(filter).await + } + + fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { + Some(self) + } + + fn as_any(&self) -> &dyn core::any::Any { + self + } +} + +impl RootMetricsComponent for PropertyRouterScheduler {} diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 4d0e91b36..79a264d5a 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::{Instant, SystemTime}; use async_trait::async_trait; -use futures::{future, Future, StreamExt}; +use futures::{Future, StreamExt, future}; use nativelink_config::schedulers::SimpleSpec; use nativelink_error::{Code, Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; @@ -25,6 +25,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEven use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId}; use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::metrics::EXECUTION_METRICS; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, @@ -34,11 +35,12 @@ use nativelink_util::platform_properties::PlatformProperties; use nativelink_util::shutdown_guard::ShutdownGuard; use nativelink_util::spawn; use nativelink_util::task::JoinHandleDropGuard; +use opentelemetry::KeyValue; use opentelemetry::baggage::BaggageExt; use opentelemetry::context::{Context, FutureExt as OtelFutureExt}; -use opentelemetry::KeyValue; use opentelemetry_semantic_conventions::attribute::ENDUSER_ID; -use tokio::sync::{mpsc, Notify}; +use serde::Serialize; +use tokio::sync::{Notify, mpsc}; use tokio::time::Duration; use tracing::{debug, error, info, info_span, warn}; @@ -49,8 +51,6 @@ use crate::simple_scheduler_state_manager::{SchedulerStateManager, SimpleSchedul use crate::worker::{ActionInfoWithProps, ActionsState, Worker, WorkerState, WorkerTimestamp}; use crate::worker_registry::WorkerRegistry; use crate::worker_scheduler::WorkerScheduler; -use serde::Serialize; -use nativelink_util::metrics::EXECUTION_METRICS; /// Default timeout for workers in seconds. /// If this changes, remember to change the documentation in the config. @@ -360,7 +360,9 @@ impl SimpleScheduler { } let total_elapsed = start.elapsed(); - EXECUTION_METRICS.do_try_match_duration.record(total_elapsed.as_secs_f64(), &[]); + EXECUTION_METRICS + .do_try_match_duration + .record(total_elapsed.as_secs_f64(), &[]); if total_elapsed > Duration::from_secs(5) { warn!( total_ms = total_elapsed.as_millis(), @@ -406,20 +408,20 @@ impl SimpleScheduler { origin_metadata: OriginMetadata, } - let mut prepared_actions: Vec = Vec::with_capacity(action_state_results.len()); - let mut platform_properties_refs: Vec<&PlatformProperties> = Vec::with_capacity(action_state_results.len()); + let mut prepared_actions: Vec = + Vec::with_capacity(action_state_results.len()); + let mut platform_properties_refs: Vec<&PlatformProperties> = + Vec::with_capacity(action_state_results.len()); for action_state_result in action_state_results { - let (action_info, maybe_origin_metadata) = match action_state_result - .as_action_info() - .await - { - Ok(result) => result, - Err(err) => { - warn!(?err, "Failed to get action_info in batch mode, skipping"); - continue; - } - }; + let (action_info, maybe_origin_metadata) = + match action_state_result.as_action_info().await { + Ok(result) => result, + Err(err) => { + warn!(?err, "Failed to get action_info in batch mode, skipping"); + continue; + } + }; // TODO(palfrey) We should not compute this every time and instead store // it with the ActionInfo when we receive it. @@ -429,7 +431,10 @@ impl SimpleScheduler { { Ok(props) => props, Err(err) => { - warn!(?err, "Failed to make platform properties in batch mode, skipping"); + warn!( + ?err, + "Failed to make platform properties in batch mode, skipping" + ); continue; } }; @@ -517,10 +522,9 @@ impl SimpleScheduler { // Merge notification results for notify_result in notify_results { - result = result.merge( - notify_result - .err_tip(|| "Failed to run batch_worker_notify_run_action in do_try_match_batch"), - ); + result = result.merge(notify_result.err_tip( + || "Failed to run batch_worker_notify_run_action in do_try_match_batch", + )); } } @@ -719,7 +723,8 @@ impl SimpleScheduler { } }; - let res = scheduler.do_try_match_internal(full_worker_logging).await; + let res = + scheduler.do_try_match_internal(full_worker_logging).await; if full_worker_logging { let operations_stream = scheduler .matching_engine_state_manager @@ -854,7 +859,7 @@ impl ClientStateManager for SimpleScheduler { Some(self) } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 2502ed9f4..41b188f61 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::awaited_action_db::{ - AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CountableActionStage, - SortedAwaitedActionState, -}; -use async_lock::Mutex; -use async_trait::async_trait; use core::ops::Bound; use core::time::Duration; -use futures::{stream, StreamExt, TryStreamExt}; -use nativelink_error::{make_err, Code, Error, ResultExt}; +use std::collections::{BTreeMap, HashMap}; +use std::string::ToString; +use std::sync::{Arc, Weak}; +use std::{env, vec}; + +use async_lock::Mutex; +use async_trait::async_trait; +use futures::{StreamExt, TryStreamExt, stream}; +use nativelink_error::{Code, Error, ResultExt, make_err}; use nativelink_metric::MetricsComponent; use nativelink_util::action_messages::{ ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, @@ -30,8 +31,8 @@ use nativelink_util::action_messages::{ use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; use nativelink_util::metrics::{ - register_queued_actions_callback, ExecutionMetricAttrs, ExecutionResult, EXECUTION_INSTANCE, - EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionStage, + EXECUTION_INSTANCE, EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionMetricAttrs, + ExecutionResult, ExecutionStage, register_queued_actions_callback, }; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -41,11 +42,11 @@ use nativelink_util::origin_event::OriginMetadata; use opentelemetry::KeyValue; use tracing::{debug, info, trace, warn}; +use super::awaited_action_db::{ + AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CountableActionStage, + SortedAwaitedActionState, +}; use crate::worker_registry::SharedWorkerRegistry; -use std::collections::{BTreeMap, HashMap}; -use std::string::ToString; -use std::sync::{Arc, Weak}; -use std::{env, vec}; /// Maximum number of times an update to the database /// can fail before giving up. @@ -164,7 +165,7 @@ impl SchedulerMetrics { } #[must_use] - pub fn result_from_stage(stage: &ActionStage) -> Option { + pub const fn result_from_stage(stage: &ActionStage) -> Option { match stage { ActionStage::Completed(result) => { if result.error.is_some() { @@ -572,7 +573,7 @@ where /// Returns a reference to the scheduler metrics for recording OTEL metrics. #[must_use] - pub fn metrics(&self) -> &SchedulerMetrics { + pub const fn metrics(&self) -> &SchedulerMetrics { &self.scheduler_metrics } @@ -587,7 +588,7 @@ where action_insert_timestamp: std::time::SystemTime, ) { // Only record if the stage actually changed - if std::mem::discriminant(previous_stage) != std::mem::discriminant(new_stage) { + if core::mem::discriminant(previous_stage) != core::mem::discriminant(new_stage) { self.record_actions_count().await; // Record the stage transition self.scheduler_metrics @@ -655,7 +656,7 @@ where EXECUTION_METRICS .execution_total_duration - .record(total_execution_duration.as_secs_f64(), &[]) + .record(total_execution_duration.as_secs_f64(), &[]); } } } @@ -1191,10 +1192,8 @@ where let priority = Some(awaited_action.action_info().priority); // Build base attributes for metrics - let mut attrs = nativelink_util::metrics::make_execution_attributes( - instance_name, - priority, - ); + let mut attrs = + nativelink_util::metrics::make_execution_attributes(instance_name, priority); // Add stage attribute let execution_stage: ExecutionStage = (&action_state.stage).into(); @@ -1235,7 +1234,7 @@ where is_retry, action_insert_timestamp, ) - .await; + .await; return Ok(()); } @@ -1274,7 +1273,7 @@ where if result.is_ok() { self.scheduler_metrics .record_stage_transition(None, ActionStage::Queued); - self.record_actions_count().await + self.record_actions_count().await; } result @@ -1508,7 +1507,7 @@ where None } - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn core::any::Any { self } } diff --git a/nativelink-scheduler/src/worker.rs b/nativelink-scheduler/src/worker.rs index 55f25a538..b20ac8819 100644 --- a/nativelink-scheduler/src/worker.rs +++ b/nativelink-scheduler/src/worker.rs @@ -306,7 +306,11 @@ impl Worker { WorkerState { id: self.id.clone(), platform_properties: self.platform_properties.clone(), - running_action_infos: self.running_action_infos.iter().map(|(k, v)| (k.to_string(), v.clone())).collect(), + running_action_infos: self + .running_action_infos + .iter() + .map(|(k, v)| (k.to_string(), v.clone())) + .collect(), last_update_timestamp: self.last_update_timestamp, is_paused: self.is_paused, is_draining: self.is_draining, diff --git a/nativelink-scheduler/tests/property_router_scheduler_test.rs b/nativelink-scheduler/tests/property_router_scheduler_test.rs new file mode 100644 index 000000000..1bb1eb504 --- /dev/null +++ b/nativelink-scheduler/tests/property_router_scheduler_test.rs @@ -0,0 +1,288 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +mod utils { + pub(crate) mod scheduler_utils; +} + +use futures::{StreamExt, join}; +use nativelink_error::{Error, make_input_err}; +use nativelink_macro::nativelink_test; +use nativelink_scheduler::mock_scheduler::MockActionScheduler; +use nativelink_scheduler::property_router_scheduler::PropertyRouterScheduler; +use nativelink_util::action_messages::{ActionStage, ActionState, OperationId}; +use nativelink_util::common::DigestInfo; +use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::operation_state_manager::{ClientStateManager, OperationFilter}; +use pretty_assertions::assert_eq; +use tokio::sync::watch; +use utils::scheduler_utils::{TokioWatchActionStateResult, make_base_action_info}; + +struct TestContext { + compile_scheduler: Arc, + default_scheduler: Arc, + router: PropertyRouterScheduler, +} + +fn make_router() -> TestContext { + let compile_scheduler = Arc::new(MockActionScheduler::new()); + let default_scheduler = Arc::new(MockActionScheduler::new()); + let mut routes = HashMap::new(); + routes.insert( + "compile".to_string(), + compile_scheduler.clone() + as Arc, + ); + let router = PropertyRouterScheduler::new( + "container-image", + routes, + default_scheduler.clone() as Arc, + ); + TestContext { + compile_scheduler, + default_scheduler, + router, + } +} + +#[nativelink_test] +async fn routes_to_matching_scheduler() -> Result<(), Error> { + let ctx = make_router(); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info + .platform_properties + .insert("container-image".to_string(), "compile".to_string()); + let action_info = Arc::new(action_info); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = + join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.compile_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert_eq!( + Some(&"compile".to_string()), + received_action.platform_properties.get("container-image") + ); + Ok(()) +} + +#[nativelink_test] +async fn routes_to_default_when_no_match() -> Result<(), Error> { + let ctx = make_router(); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info.platform_properties.insert( + "container-image".to_string(), + "some-other-image".to_string(), + ); + let action_info = Arc::new(action_info); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = + join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.default_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert_eq!( + Some(&"some-other-image".to_string()), + received_action.platform_properties.get("container-image") + ); + Ok(()) +} + +#[nativelink_test] +async fn routes_to_default_when_property_missing() -> Result<(), Error> { + let ctx = make_router(); + let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = + join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.default_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert!( + !received_action + .platform_properties + .contains_key("container-image"), + "Expected no container-image property" + ); + Ok(()) +} + +#[nativelink_test] +async fn routes_multiple_values() -> Result<(), Error> { + let ctx = make_router(); + + // First action: routes to compile_scheduler + { + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info + .platform_properties + .insert("container-image".to_string(), "compile".to_string()); + let action_info = Arc::new(action_info); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.compile_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert_eq!( + Some(&"compile".to_string()), + received_action.platform_properties.get("container-image") + ); + } + + // Second action: routes to default_scheduler + { + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info + .platform_properties + .insert("container-image".to_string(), "default-image".to_string()); + let action_info = Arc::new(action_info); + + let (_tx, rx) = watch::channel(Arc::new(ActionState { + client_operation_id: OperationId::default(), + stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), + last_transition_timestamp: SystemTime::now(), + })); + let client_operation_id = OperationId::default(); + + let (_, (received_op_id, received_action)) = join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.default_scheduler.expect_add_action(Ok(Box::new( + TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx) + ))), + ); + assert_eq!(client_operation_id, received_op_id); + assert_eq!( + Some(&"default-image".to_string()), + received_action.platform_properties.get("container-image") + ); + } + + Ok(()) +} + +#[nativelink_test] +async fn filter_operations_fans_out_to_all() -> Result<(), Error> { + let ctx = make_router(); + let filter = OperationFilter { + client_operation_id: Some(OperationId::default()), + ..Default::default() + }; + + // The router calls filter_operations sequentially on routes then default. + // Since HashMap order is arbitrary, we join both expects concurrently. + let (router_result, compile_filter, default_filter) = join!( + ctx.router.filter_operations(filter.clone()), + ctx.compile_scheduler + .expect_filter_operations(Ok(Box::pin(futures::stream::empty()))), + ctx.default_scheduler + .expect_filter_operations(Ok(Box::pin(futures::stream::empty()))), + ); + + assert!(router_result.unwrap().next().await.is_none()); + assert_eq!(filter, compile_filter); + assert_eq!(filter, default_filter); + Ok(()) +} + +#[nativelink_test] +async fn known_properties_unions_all_schedulers() -> Result<(), Error> { + let ctx = make_router(); + + let (known_props, _compile_instance, _default_instance) = join!( + ctx.router.get_known_properties("my-instance"), + ctx.compile_scheduler + .expect_get_known_properties(Ok(vec!["cpu_arch".to_string()])), + ctx.default_scheduler + .expect_get_known_properties(Ok(vec!["os".to_string(), "cpu_arch".to_string()])), + ); + + let mut props = known_props.unwrap(); + props.sort(); + assert_eq!(vec!["cpu_arch".to_string(), "os".to_string()], props); + Ok(()) +} + +#[nativelink_test] +async fn error_from_nested_scheduler_propagates() -> Result<(), Error> { + let ctx = make_router(); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()) + .as_ref() + .clone(); + action_info + .platform_properties + .insert("container-image".to_string(), "compile".to_string()); + let action_info = Arc::new(action_info); + + let client_operation_id = OperationId::default(); + let (result, _) = join!( + ctx.router + .add_action(client_operation_id.clone(), action_info.clone()), + ctx.compile_scheduler + .expect_add_action(Err(make_input_err!("Simulated scheduler error"))), + ); + + assert!( + result.is_err(), + "Expected error to propagate from nested scheduler" + ); + Ok(()) +} diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index 706206c74..68979333a 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -378,7 +378,7 @@ impl Execution for ExecutionServer { #[cfg(test)] #[test] fn test_nl_op_id_from_name() -> Result<(), Box> { - let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")]; + let examples = [("foo/bar", "foo"), ("a/b/c/d", "a")]; for (input, expected) in examples { let id = NativelinkOperationId::from_name(input)?; diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 63e161891..ca0cc6bab 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -24,6 +24,7 @@ use nativelink_error::Error; use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics::StoreType; use nativelink_util::store_trait::{Store, StoreDriver}; + use crate::completeness_checking_store::CompletenessCheckingStore; use crate::compression_store::CompressionStore; use crate::dedup_store::DedupStore; @@ -140,13 +141,13 @@ pub fn store_factory<'a>( ))) } else { Ok(store) - } + }; }) } fn should_wrap_in_metrics_store(spec: &StoreSpec) -> bool { if env::var("NL_STORE_METRICS").is_err() { - return false + return false; } matches!( diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index b29346ac9..d5df21055 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -30,6 +30,7 @@ use nativelink_util::buf_channel::{ }; use nativelink_util::fs; use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; +use nativelink_util::metrics::FAST_SLOW_STORE_METRICS; use nativelink_util::store_trait::{ RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, UploadSizeInfo, slow_update_store_with_file, @@ -37,7 +38,6 @@ use nativelink_util::store_trait::{ use parking_lot::Mutex; use tokio::sync::OnceCell; use tracing::{debug, trace, warn}; -use nativelink_util::metrics::FAST_SLOW_STORE_METRICS; // TODO(palfrey) This store needs to be evaluated for more efficient memory usage, // there are many copies happening internally. diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 4a367ee33..fce478231 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -27,6 +27,7 @@ pub mod gcs_client; pub mod gcs_store; pub mod grpc_store; pub mod memory_store; +pub mod metrics_store; pub mod mongo_store; pub mod noop_store; pub mod ontap_s3_existence_cache_store; @@ -39,4 +40,3 @@ pub mod shard_store; pub mod size_partitioning_store; pub mod store_manager; pub mod verify_store; -pub mod metrics_store; diff --git a/nativelink-store/src/metrics_store.rs b/nativelink-store/src/metrics_store.rs index fe3a363f1..8abaa67a5 100644 --- a/nativelink-store/src/metrics_store.rs +++ b/nativelink-store/src/metrics_store.rs @@ -1,17 +1,19 @@ -use crate::filesystem_store::FilesystemStore; +use core::pin::Pin; +use std::borrow::Cow; +use std::sync::Arc; +use std::time::Instant; + use async_trait::async_trait; use nativelink_error::Error; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; -use nativelink_util::metrics::{StoreMetricAttrs, StoreType, STORE_METRICS}; +use nativelink_util::metrics::{STORE_METRICS, StoreMetricAttrs, StoreType}; use nativelink_util::store_trait::{ RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, }; -use std::borrow::Cow; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Instant; + +use crate::filesystem_store::FilesystemStore; #[derive(MetricsComponent, Debug)] pub struct MetricsStore { @@ -42,7 +44,9 @@ impl MetricsStore { tracing::error!("Failed to register remove callback: {:?}", e); } - STORE_METRICS.store_size.record(fs_store.get_len(), &attrs.store_size()); + STORE_METRICS + .store_size + .record(fs_store.get_len(), &attrs.store_size()); } Arc::new(Self { @@ -109,7 +113,9 @@ impl StoreDriver for MetricsStore { } if let Some(fs_store) = self.inner.downcast_ref::(None) { - STORE_METRICS.store_size.record(fs_store.get_len(), &self.attrs.store_size()); + STORE_METRICS + .store_size + .record(fs_store.get_len(), &self.attrs.store_size()); } result diff --git a/nativelink-store/src/mongo_store.rs b/nativelink-store/src/mongo_store.rs index 2110a20b7..d68500a17 100644 --- a/nativelink-store/src/mongo_store.rs +++ b/nativelink-store/src/mongo_store.rs @@ -1101,7 +1101,7 @@ impl SchedulerStore for ExperimentalMongoStore { async fn count_by_index(&self, index: Vec) -> Result, Error> where - K: SchedulerIndexProvider + Send + K: SchedulerIndexProvider + Send, { Ok(vec![0; index.len()]) } diff --git a/nativelink-util/src/metrics.rs b/nativelink-util/src/metrics.rs index 82537194e..faf9d6b0d 100644 --- a/nativelink-util/src/metrics.rs +++ b/nativelink-util/src/metrics.rs @@ -14,11 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Display, Formatter}; +use core::fmt::{Display, Formatter}; use std::sync::{LazyLock, OnceLock}; +use opentelemetry::{InstrumentationScope, KeyValue, Value, global, metrics}; + use crate::action_messages::ActionStage; -use opentelemetry::{global, metrics, InstrumentationScope, KeyValue, Value}; /// Callback type for observable gauges that report queued action counts. /// The callback receives an `Observer` that should be used to record values with attributes. @@ -700,16 +701,13 @@ pub struct ExecutionMetrics { pub execution_actions_count: metrics::Gauge, // Gauge of queued actions by platform properties pub execution_queued_actions_count: metrics::ObservableGauge, - /// Duration of do_try_match in ms + /// Duration of `do_try_match` in ms pub do_try_match_duration: metrics::Histogram, } /// Helper function to create attributes for execution metrics #[must_use] -pub fn make_execution_attributes( - instance_name: &str, - priority: Option, -) -> Vec { +pub fn make_execution_attributes(instance_name: &str, priority: Option) -> Vec { let mut attrs = vec![KeyValue::new(EXECUTION_INSTANCE, instance_name.to_string())]; if let Some(priority) = priority { @@ -1601,7 +1599,7 @@ pub enum StoreType { } impl Display for StoreType { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { StoreType::Filesystem => write!(f, "filesystem"), StoreType::S3 => write!(f, "s3"), @@ -1644,7 +1642,7 @@ pub static STORE_METRICS: LazyLock = LazyLock::new(|| { // memory, a filesystem, or network storage. The current values were // determined empirically and might need adjustment. .with_boundaries(vec![ - 0.1, // 100μs + 0.1, // 100μs // Sub-millisecond range 0.5, // 500μs 1.0, // 1ms @@ -1654,9 +1652,9 @@ pub static STORE_METRICS: LazyLock = LazyLock::new(|| { 50.0, // 50ms 100.0, // 100ms // Higher latency range - 500.0, // 500ms - 1000.0, // 1 second - 5000.0, // 5 seconds + 500.0, // 500ms + 1000.0, // 1 second + 5000.0, // 5 seconds 10000.0, // 10 seconds ]) .build(), @@ -1696,7 +1694,6 @@ pub struct StoreMetricAttrs { write_error: Vec, eviction: Vec, store_size: Vec, - } impl StoreMetricAttrs { @@ -1727,7 +1724,6 @@ impl StoreMetricAttrs { write_error: make_attrs(CacheOperationName::Write, CacheOperationResult::Error), eviction: make_attrs(CacheOperationName::Evict, CacheOperationResult::Success), store_size: base_attrs.clone(), - } } diff --git a/nativelink-util/src/operation_state_manager.rs b/nativelink-util/src/operation_state_manager.rs index 3a4b8806e..38ef051b8 100644 --- a/nativelink-util/src/operation_state_manager.rs +++ b/nativelink-util/src/operation_state_manager.rs @@ -122,7 +122,7 @@ pub trait ClientStateManager: Sync + Send + Unpin + MetricsComponent + 'static { fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider>; /// Returns the implementation as `Any` so that it can be downcast to a concrete type. - fn as_any(&self) -> &dyn std::any::Any; + fn as_any(&self) -> &dyn core::any::Any; } /// The type of update to perform on an operation. diff --git a/nativelink-util/src/telemetry.rs b/nativelink-util/src/telemetry.rs index 18606ce1e..b7c968b7b 100644 --- a/nativelink-util/src/telemetry.rs +++ b/nativelink-util/src/telemetry.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::default::Default; +use std::env; +use std::sync::OnceLock; + use base64::Engine; use base64::prelude::BASE64_STANDARD_NO_PAD; -use core::default::Default; use ginepro::LoadBalancedChannel; use hyper::http::Response; use nativelink_error::{Code, ResultExt, make_err}; @@ -34,8 +37,6 @@ use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator}; use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_semantic_conventions::attribute::ENDUSER_ID; use prost::Message; -use std::env; -use std::sync::OnceLock; use tracing::debug; use tracing::metadata::LevelFilter; use tracing_opentelemetry::{MetricsLayer, layer}; @@ -209,9 +210,11 @@ const NL_OTEL_ENDPOINT: &str = "NL_OTEL_ENDPOINT"; async fn maybe_load_balanced_channel() -> Option { match env::var(NL_OTEL_ENDPOINT) { Ok(endpoint) => { - let url = Url::parse(endpoint.as_str()).map_err(|e| { - make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}") - }).unwrap(); + let url = Url::parse(endpoint.as_str()) + .map_err(|e| { + make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}") + }) + .unwrap(); let host = url .host() diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 1f7768830..c01ac9f49 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -23,10 +23,13 @@ use std::env; use std::process::Stdio; use std::sync::{Arc, Weak}; use std::time::Instant; + use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{Future, FutureExt, StreamExt, TryFutureExt, select}; -use nativelink_config::cas_server::{EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig}; +use nativelink_config::cas_server::{ + EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig, +}; use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient; @@ -38,6 +41,7 @@ use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId}; use nativelink_util::common::fs; use nativelink_util::digest_hasher::DigestHasherFunc; +use nativelink_util::metrics::{LOCAL_WORKER_METRICS, WorkerMetricAttrs}; use nativelink_util::shutdown_guard::ShutdownGuard; use nativelink_util::store_trait::Store; use nativelink_util::{spawn, tls_utils}; @@ -49,7 +53,7 @@ use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Streaming; use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn}; -use nativelink_util::metrics::{WorkerMetricAttrs, LOCAL_WORKER_METRICS}; + use crate::running_actions_manager::{ ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction, RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl, @@ -866,21 +870,21 @@ impl Metrics { } } - /// Increment the start_actions_received counter + /// Increment the `start_actions_received` counter pub fn inc_start_actions_received(&self) { LOCAL_WORKER_METRICS .start_actions_received .add(1, self.attrs.base()); } - /// Increment the disconnects_received counter + /// Increment the `disconnects_received` counter pub fn inc_disconnects_received(&self) { LOCAL_WORKER_METRICS .disconnects_received .add(1, self.attrs.base()); } - /// Increment the keep_alives_received counter + /// Increment the `keep_alives_received` counter pub fn inc_keep_alives_received(&self) { LOCAL_WORKER_METRICS .keep_alives_received diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 033e9e113..aedfb4d3c 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1529,8 +1529,7 @@ impl RunningAction for RunningActionImpl { "upload_results: starting with timeout", ); let metrics = self.metrics().clone(); - let upload_fut = metrics - .wrap_upload_results(Self::inner_upload_results(self)); + let upload_fut = metrics.wrap_upload_results(Self::inner_upload_results(self)); let stall_warn_fut = async { let mut elapsed_secs = 0u64; @@ -2388,7 +2387,7 @@ impl MetricsTimer { impl Metrics { /// Create a new Metrics instance with optional attributes. - pub fn new() -> Self { + pub const fn new() -> Self { Self { attrs: Vec::new() } } diff --git a/nativelink-worker/tests/utils/local_worker_test_utils.rs b/nativelink-worker/tests/utils/local_worker_test_utils.rs index adbef171e..71ea38052 100644 --- a/nativelink-worker/tests/utils/local_worker_test_utils.rs +++ b/nativelink-worker/tests/utils/local_worker_test_utils.rs @@ -144,10 +144,7 @@ impl MockWorkerApiClient { req } - pub(crate) async fn expect_going_away( - &self, - result: Result<(), Error>, - ) -> GoingAwayRequest { + pub(crate) async fn expect_going_away(&self, result: Result<(), Error>) -> GoingAwayRequest { let mut rx_call_lock = self.rx_call.lock().await; let req = match rx_call_lock .recv() @@ -186,10 +183,10 @@ impl WorkerApiClientTrait for MockWorkerApiClient { WorkerClientApiReturns::ConnectWorker(result) => result, resp @ WorkerClientApiReturns::ExecutionResponse(_) => { panic!("connect_worker expected ConnectWorker response, received {resp:?}") - }, + } resp @ WorkerClientApiReturns::GoingAway(_) => { panic!("connect_worker expected ConnectWorker response, received {resp:?}") - }, + } } } @@ -271,7 +268,9 @@ pub(crate) async fn setup_local_worker_with_config( let (shutdown_tx_test, _) = broadcast::channel::(BROADCAST_CAPACITY); let drop_guard = spawn!("local_worker_spawn", async move { - worker.run(shutdown_tx_test.clone(), shutdown_tx_test.subscribe()).await + worker + .run(shutdown_tx_test.clone(), shutdown_tx_test.subscribe()) + .await }); let (tx_stream, streaming_response) = setup_grpc_stream(); diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index c9578626c..49ebf8cfd 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -196,9 +196,14 @@ async fn inner_main( let health_component_name = format!("stores/{name}"); let mut health_register_store = health_registry_lock.sub_builder(&health_component_name); - let store = store_factory(&name, &spec, &store_manager, Some(&mut health_register_store)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?; + let store = store_factory( + &name, + &spec, + &store_manager, + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?; store_manager.add_store(&name, store); } } @@ -681,16 +686,20 @@ async fn inner_main( let worker_name = name.clone(); let fut = trace_span!("worker_ctx", worker_name = %name) .in_scope(|| local_worker.run(shutdown_tx.clone(), shutdown_rx)); - spawn!("worker", async move { - let result = fut.await; - if result.is_ok() { - // Worker completed successfully (graceful shutdown). - // Exit the process with code 0. - info!(worker_name = %worker_name, "Worker completed successfully, exiting process"); - std::process::exit(0); - } - result - }, ?name) + spawn!( + "worker", + async move { + let result = fut.await; + if result.is_ok() { + // Worker completed successfully (graceful shutdown). + // Exit the process with code 0. + info!(worker_name = %worker_name, "Worker completed successfully, exiting process"); + std::process::exit(0); + } + result + }, + ?name + ) } }; root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));