From 6819b08cdcba2bfc797350612a5ab124712a9382 Mon Sep 17 00:00:00 2001
From: Pavel Khotulev
Date: Fri, 20 Mar 2026 12:11:29 +0000
Subject: [PATCH] Introduce scheduler router
---
docs/property-router-scheduler-plan.md | 408 ++++++++++++++++++
nativelink-config/src/cas_server.rs | 8 +-
nativelink-config/src/schedulers.rs | 18 +
nativelink-config/src/serde_utils.rs | 6 +-
nativelink-scheduler/BUILD.bazel | 2 +
.../src/api_worker_scheduler.rs | 78 ++--
.../src/cache_lookup_scheduler.rs | 2 +-
.../src/default_scheduler_factory.rs | 33 ++
nativelink-scheduler/src/grpc_scheduler.rs | 2 +-
nativelink-scheduler/src/lib.rs | 1 +
.../src/memory_awaited_action_db.rs | 28 +-
nativelink-scheduler/src/mock_scheduler.rs | 2 +-
.../src/property_modifier_scheduler.rs | 2 +-
.../src/property_router_scheduler.rs | 129 ++++++
nativelink-scheduler/src/simple_scheduler.rs | 55 +--
.../src/simple_scheduler_state_manager.rs | 49 ++-
nativelink-scheduler/src/worker.rs | 6 +-
.../tests/property_router_scheduler_test.rs | 288 +++++++++++++
nativelink-service/src/execution_server.rs | 2 +-
nativelink-store/src/default_store_factory.rs | 5 +-
nativelink-store/src/fast_slow_store.rs | 2 +-
nativelink-store/src/lib.rs | 2 +-
nativelink-store/src/metrics_store.rs | 22 +-
nativelink-store/src/mongo_store.rs | 2 +-
nativelink-util/src/metrics.rs | 24 +-
.../src/operation_state_manager.rs | 2 +-
nativelink-util/src/telemetry.rs | 15 +-
nativelink-worker/src/local_worker.rs | 14 +-
.../src/running_actions_manager.rs | 5 +-
.../tests/utils/local_worker_test_utils.rs | 13 +-
src/bin/nativelink.rs | 35 +-
31 files changed, 1092 insertions(+), 168 deletions(-)
create mode 100644 docs/property-router-scheduler-plan.md
create mode 100644 nativelink-scheduler/src/property_router_scheduler.rs
create mode 100644 nativelink-scheduler/tests/property_router_scheduler_test.rs
diff --git a/docs/property-router-scheduler-plan.md b/docs/property-router-scheduler-plan.md
new file mode 100644
index 000000000..c99dae7bc
--- /dev/null
+++ b/docs/property-router-scheduler-plan.md
@@ -0,0 +1,408 @@
+# Plan: PropertyRouterScheduler
+
+Routes incoming actions to different backend schedulers based on a
+platform property value (e.g. `container-image`), so the client always
+talks to one endpoint and knows nothing about the internal topology.
+
+## Architecture
+
+```
+Bazel Client
+ │
+ │ ExecuteRequest
+ ▼
+Front NativeLink Process
+ ├── ExecutionServer
+ │ │
+ │ │ add_action(action_info)
+ │ ▼
+ │ PropertyRouterScheduler
+ │ │
+ │ │ reads action_info.platform_properties["container-image"]
+ │ │
+ │ ├── "compile" / "test-env" / "test-fat-env"
+ │ │ └── GrpcScheduler ──► Scheduler Process 1
+ │ │ │
+ │ │ Workers (compile, test)
+ │ │
+ │ └── anything else (default)
+ │ └── GrpcScheduler ──► Scheduler Process 2
+ │ │
+ │ Workers (default)
+ │
+ └── worker_api (not exposed on front process — managed by backend processes)
+```
+
+## Files Changed: 8 total (3 new, 5 modified)
+
+### New files
+
+| File | Description |
+|------|-------------|
+| `nativelink-scheduler/src/property_router_scheduler.rs` | Core implementation |
+| `nativelink-scheduler/tests/property_router_scheduler_test.rs` | Unit tests |
+| `docs/property-router-scheduler-plan.md` | This file |
+
+### Modified files
+
+| File | Change |
+|------|--------|
+| `nativelink-config/src/schedulers.rs` | Add `PropertyRouterSpec` struct and `SchedulerSpec::PropertyRouter` variant |
+| `nativelink-scheduler/src/lib.rs` | Register `property_router_scheduler` module |
+| `nativelink-scheduler/src/default_scheduler_factory.rs` | Add match arm for `PropertyRouter` |
+
+---
+
+## Step 1 — Config
+
+**File:** `nativelink-config/src/schedulers.rs`
+
+Add after `PropertyModifierSpec`:
+
+```rust
+/// Routes actions to different schedulers based on a platform property value.
+/// Actions whose property value matches a key in `routes` go to that scheduler.
+/// All other actions (missing property or unmatched value) go to `default_scheduler`.
+#[derive(Deserialize, Serialize, Debug)]
+#[serde(deny_unknown_fields)]
+#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
+pub struct PropertyRouterSpec {
+ /// The platform property key to match on (e.g. "container-image").
+ #[serde(deserialize_with = "convert_string_with_shellexpand")]
+ pub property_name: String,
+
+ /// Map of property value -> nested scheduler spec.
+ pub routes: HashMap,
+
+ /// Scheduler to use when the property is absent or its value does not match any route.
+ pub default_scheduler: Box,
+}
+```
+
+Add variant to `SchedulerSpec`:
+
+```rust
+pub enum SchedulerSpec {
+ Simple(SimpleSpec),
+ Grpc(GrpcSpec),
+ CacheLookup(CacheLookupSpec),
+ PropertyModifier(PropertyModifierSpec),
+ PropertyRouter(PropertyRouterSpec), // <-- new
+}
+```
+
+---
+
+## Step 2 — Core Implementation
+
+**File:** `nativelink-scheduler/src/property_router_scheduler.rs`
+
+Follows the exact same pattern as `property_modifier_scheduler.rs`.
+
+### Struct
+
+```rust
+#[derive(MetricsComponent)]
+pub struct PropertyRouterScheduler {
+ property_name: String,
+ #[metric(group = "routes")]
+ routes: HashMap>,
+ #[metric(group = "default_scheduler")]
+ default_scheduler: Arc,
+}
+
+impl PropertyRouterScheduler {
+ pub fn new(
+ property_name: &str,
+ routes: HashMap>,
+ default_scheduler: Arc,
+ ) -> Self {
+ Self {
+ property_name: property_name.to_string(),
+ routes,
+ default_scheduler,
+ }
+ }
+}
+```
+
+### `add_action` — the core routing logic
+
+Reads the property value from `action_info.platform_properties`
+(`HashMap`), looks it up in `routes`, falls back to
+`default_scheduler`:
+
+```rust
+async fn inner_add_action(
+ &self,
+ client_operation_id: OperationId,
+ action_info: Arc,
+) -> Result, Error> {
+ let scheduler = action_info
+ .platform_properties
+ .get(&self.property_name)
+ .and_then(|value| self.routes.get(value))
+ .unwrap_or(&self.default_scheduler);
+
+ scheduler.add_action(client_operation_id, action_info).await
+}
+```
+
+### `filter_operations` — fan-out to all schedulers
+
+The caller (e.g. `WaitExecution`) does not know which backend scheduler
+holds the operation, so the router must query all of them and merge:
+
+```rust
+async fn inner_filter_operations(
+ &self,
+ filter: OperationFilter,
+) -> Result, Error> {
+ let mut streams = Vec::with_capacity(self.routes.len() + 1);
+ for scheduler in self.routes.values() {
+ streams.push(scheduler.filter_operations(filter.clone()).await?);
+ }
+ streams.push(self.default_scheduler.filter_operations(filter).await?);
+ Ok(Box::pin(futures::stream::select_all(streams)))
+}
+```
+
+`OperationFilter` is already `Clone` (derives it at line 67 of
+`nativelink-util/src/operation_state_manager.rs`).
+
+### `KnownPlatformPropertyProvider` — union of all nested schedulers
+
+```rust
+async fn inner_get_known_properties(
+ &self,
+ instance_name: &str,
+) -> Result, Error> {
+ let mut all_props = HashSet::new();
+ for scheduler in self.routes.values() {
+ if let Some(p) = scheduler.as_known_platform_property_provider() {
+ for prop in p.get_known_properties(instance_name).await? {
+ all_props.insert(prop);
+ }
+ }
+ }
+ if let Some(p) = self.default_scheduler.as_known_platform_property_provider() {
+ for prop in p.get_known_properties(instance_name).await? {
+ all_props.insert(prop);
+ }
+ }
+ Ok(all_props.into_iter().collect())
+}
+```
+
+### Trait impls
+
+Implements `ClientStateManager`, `KnownPlatformPropertyProvider`,
+`RootMetricsComponent`. Does **not** implement `WorkerScheduler` — the
+router never manages workers directly.
+
+---
+
+## Step 3 — Register Module
+
+**File:** `nativelink-scheduler/src/lib.rs`
+
+Add:
+
+```rust
+pub mod property_router_scheduler;
+```
+
+---
+
+## Step 4 — Factory
+
+**File:** `nativelink-scheduler/src/default_scheduler_factory.rs`
+
+Add import at the top:
+
+```rust
+use crate::property_router_scheduler::PropertyRouterScheduler;
+```
+
+Add match arm in `inner_scheduler_factory` after `PropertyModifier`:
+
+```rust
+SchedulerSpec::PropertyRouter(spec) => {
+ let mut routes = HashMap::with_capacity(spec.routes.len());
+ for (value, nested_spec) in &spec.routes {
+ let (action_scheduler, _) = Box::pin(inner_scheduler_factory(
+ nested_spec,
+ store_manager,
+ maybe_origin_event_tx,
+ ))
+ .await
+ .err_tip(|| format!("In nested PropertyRouterScheduler route '{value}'"))?;
+ routes.insert(
+ value.clone(),
+ action_scheduler.err_tip(|| {
+ format!("Nested route '{value}' is not an action scheduler")
+ })?,
+ );
+ }
+ let (default_action_scheduler, _) = Box::pin(inner_scheduler_factory(
+ &spec.default_scheduler,
+ store_manager,
+ maybe_origin_event_tx,
+ ))
+ .await
+ .err_tip(|| "In PropertyRouterScheduler default_scheduler")?;
+ let router = Arc::new(PropertyRouterScheduler::new(
+ &spec.property_name,
+ routes,
+ default_action_scheduler
+ .err_tip(|| "Default scheduler is not an action scheduler")?,
+ ));
+ (Some(router), None)
+}
+```
+
+---
+
+## Step 5 — Tests
+
+**File:** `nativelink-scheduler/tests/property_router_scheduler_test.rs`
+
+Uses `MockActionScheduler` — same pattern as `property_modifier_scheduler_test.rs`.
+
+### Test fixture
+
+```rust
+struct TestContext {
+ compile_scheduler: Arc,
+ default_scheduler: Arc,
+ router: PropertyRouterScheduler,
+}
+
+fn make_router() -> TestContext {
+ let compile_scheduler = Arc::new(MockActionScheduler::new());
+ let default_scheduler = Arc::new(MockActionScheduler::new());
+ let mut routes = HashMap::new();
+ routes.insert(
+ "compile".to_string(),
+ compile_scheduler.clone() as Arc,
+ );
+ let router = PropertyRouterScheduler::new(
+ "container-image",
+ routes,
+ default_scheduler.clone() as Arc,
+ );
+ TestContext { compile_scheduler, default_scheduler, router }
+}
+```
+
+### Tests
+
+| # | Name | Scenario | Expected |
+|---|------|----------|----------|
+| 1 | `routes_to_matching_scheduler` | `container-image=compile` | `compile_scheduler.expect_add_action` fires, `default_scheduler` idle |
+| 2 | `routes_to_default_when_no_match` | `container-image=other` | `default_scheduler.expect_add_action` fires, `compile_scheduler` idle |
+| 3 | `routes_to_default_when_property_missing` | No `container-image` key | `default_scheduler.expect_add_action` fires |
+| 4 | `routes_multiple_values` | Two actions: `compile` then `other` | Each routed to correct scheduler |
+| 5 | `filter_operations_fans_out_to_all` | `filter_operations` called | Both `compile_scheduler` and `default_scheduler` receive the same filter |
+| 6 | `known_properties_unions_all_schedulers` | `get_known_properties` called | Returns union of props from both schedulers |
+| 7 | `error_from_nested_scheduler_propagates` | `compile_scheduler` returns `Err` | Router propagates the error |
+
+### Example test (test #1)
+
+```rust
+#[nativelink_test]
+async fn routes_to_matching_scheduler() -> Result<(), Error> {
+ let ctx = make_router();
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info
+ .platform_properties
+ .insert("container-image".to_string(), "compile".to_string());
+ let action_info = Arc::new(action_info);
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) = join!(
+ ctx.router.add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.compile_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert_eq!(
+ Some(&"compile".to_string()),
+ received_action.platform_properties.get("container-image")
+ );
+ Ok(())
+}
+```
+
+---
+
+## Example Production Config
+
+```json5
+// scheduler.json5 (front process — one endpoint for all clients)
+{
+ stores: [
+ {
+ name: "CAS_STORE",
+ grpc: {
+ instance_name: "main",
+ endpoints: [{ address: "grpc://cas-node:50051" }],
+ store_type: "cas",
+ },
+ },
+ ],
+ schedulers: [
+ {
+ name: "MAIN_SCHEDULER",
+ property_router: {
+ property_name: "container-image",
+ routes: {
+ "compile": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } },
+ "test-env": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } },
+ "test-fat-env": { grpc: { endpoint: { address: "grpc://sched-compile:50052" } } },
+ },
+ default_scheduler: { grpc: { endpoint: { address: "grpc://sched-default:50052" } } },
+ },
+ },
+ ],
+ servers: [
+ {
+ listener: { http: { socket_address: "0.0.0.0:50052" } },
+ services: {
+ execution: [
+ { instance_name: "", cas_store: "CAS_STORE", scheduler: "MAIN_SCHEDULER" },
+ { instance_name: "main", cas_store: "CAS_STORE", scheduler: "MAIN_SCHEDULER" },
+ ],
+ capabilities: [
+ { instance_name: "", remote_execution: { scheduler: "MAIN_SCHEDULER" } },
+ { instance_name: "main", remote_execution: { scheduler: "MAIN_SCHEDULER" } },
+ ],
+ health: {},
+ },
+ },
+ ],
+}
+```
+
+---
+
+## Notes
+
+- `WorkerScheduler` is **not** implemented by the router — worker management
+ stays entirely in the backend scheduler processes.
+- The router does not cache the routing decision. This is intentional:
+ `add_action` reads a `HashMap` lookup — O(1), zero cost.
+- `filter_operations` fan-out is necessary because `WaitExecution` uses it
+ and does not know which backend scheduler owns the operation.
+ With N backend schedulers this is N parallel gRPC calls — acceptable since
+ it's used for status polling, not hot-path action dispatch.
diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs
index c7f9f4882..f7d2e5555 100644
--- a/nativelink-config/src/cas_server.rs
+++ b/nativelink-config/src/cas_server.rs
@@ -22,9 +22,9 @@ use serde::{Deserialize, Serialize};
use crate::schedulers::SchedulerSpec;
use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
- convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
- convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
- convert_vec_string_with_shellexpand, convert_enum_with_shellexpand,
+ convert_enum_with_shellexpand, convert_numeric_with_shellexpand,
+ convert_optional_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
+ convert_string_with_shellexpand, convert_vec_string_with_shellexpand,
};
use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName, StoreSpec};
@@ -854,7 +854,7 @@ pub struct LocalWorkerConfig {
/// Default: None (directory cache disabled)
pub directory_cache: Option,
- #[serde(deserialize_with = "convert_enum_with_shellexpand")]
+ #[serde(default, deserialize_with = "convert_enum_with_shellexpand")]
pub execution_completion_behaviour: ExecutionCompletionBehaviour,
}
diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs
index 1dc4723bd..c2001817e 100644
--- a/nativelink-config/src/schedulers.rs
+++ b/nativelink-config/src/schedulers.rs
@@ -32,6 +32,7 @@ pub enum SchedulerSpec {
Grpc(GrpcSpec),
CacheLookup(CacheLookupSpec),
PropertyModifier(PropertyModifierSpec),
+ PropertyRouter(PropertyRouterSpec),
}
/// When the scheduler matches tasks to workers that are capable of running
@@ -323,3 +324,20 @@ pub struct PropertyModifierSpec {
/// The nested scheduler to use after modifying the properties.
pub scheduler: Box,
}
+
+/// Routes actions to different schedulers based on a platform property value.
+/// Actions whose property value matches a key in `routes` go to that scheduler.
+/// All other actions (missing property or unmatched value) go to `default_scheduler`.
+#[derive(Deserialize, Serialize, Debug)]
+#[serde(deny_unknown_fields)]
+#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
+pub struct PropertyRouterSpec {
+ /// The platform property key to match on (e.g. "container-image").
+ pub property_name: String,
+
+ /// Map of property value -> nested scheduler spec.
+ pub routes: HashMap,
+
+ /// Scheduler to use when the property is absent or its value does not match any route.
+ pub default_scheduler: Box,
+}
diff --git a/nativelink-config/src/serde_utils.rs b/nativelink-config/src/serde_utils.rs
index 5ebdf0e11..375b3272e 100644
--- a/nativelink-config/src/serde_utils.rs
+++ b/nativelink-config/src/serde_utils.rs
@@ -486,10 +486,8 @@ where
T: DeserializeOwned,
{
let s = String::deserialize(deserializer)?;
- let expanded = shellexpand::env(&s)
- .map_err(de::Error::custom)?;
+ let expanded = shellexpand::env(&s).map_err(de::Error::custom)?;
let quoted = format!("\"{}\"", expanded);
- serde_json5::from_str("ed)
- .map_err(de::Error::custom)
+ serde_json5::from_str("ed).map_err(de::Error::custom)
}
diff --git a/nativelink-scheduler/BUILD.bazel b/nativelink-scheduler/BUILD.bazel
index 74133d42b..a0653adb3 100644
--- a/nativelink-scheduler/BUILD.bazel
+++ b/nativelink-scheduler/BUILD.bazel
@@ -21,6 +21,7 @@ rust_library(
"src/mock_scheduler.rs",
"src/platform_property_manager.rs",
"src/property_modifier_scheduler.rs",
+ "src/property_router_scheduler.rs",
"src/simple_scheduler.rs",
"src/simple_scheduler_state_manager.rs",
"src/store_awaited_action_db.rs",
@@ -66,6 +67,7 @@ rust_test_suite(
"tests/action_messages_test.rs",
"tests/cache_lookup_scheduler_test.rs",
"tests/property_modifier_scheduler_test.rs",
+ "tests/property_router_scheduler_test.rs",
"tests/redis_store_awaited_action_db_test.rs",
"tests/simple_scheduler_state_manager_test.rs",
"tests/simple_scheduler_test.rs",
diff --git a/nativelink-scheduler/src/api_worker_scheduler.rs b/nativelink-scheduler/src/api_worker_scheduler.rs
index 665467f4e..22647a0a9 100644
--- a/nativelink-scheduler/src/api_worker_scheduler.rs
+++ b/nativelink-scheduler/src/api_worker_scheduler.rs
@@ -22,13 +22,13 @@ use std::time::{Instant, UNIX_EPOCH};
use async_lock::RwLock;
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
-use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
+use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
use nativelink_metric::{
- group, MetricFieldData, MetricKind, MetricPublishKnownKindData,
- MetricsComponent, RootMetricsComponent,
+ MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
+ RootMetricsComponent, group,
};
use nativelink_util::action_messages::{OperationId, WorkerId};
-use nativelink_util::metrics::{WORKER_POOL_METRICS, WORKER_POOL_INSTANCE, WorkerPoolMetricAttrs};
+use nativelink_util::metrics::{WORKER_POOL_INSTANCE, WORKER_POOL_METRICS, WorkerPoolMetricAttrs};
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
use nativelink_util::platform_properties::PlatformProperties;
use nativelink_util::shutdown_guard::ShutdownGuard;
@@ -63,7 +63,10 @@ pub struct SchedulerMetrics {
}
use crate::platform_property_manager::PlatformPropertyManager;
-use crate::worker::{reduce_platform_properties, Worker, ActionInfoWithProps, WorkerState, WorkerTimestamp, WorkerUpdate};
+use crate::worker::{
+ ActionInfoWithProps, Worker, WorkerState, WorkerTimestamp, WorkerUpdate,
+ reduce_platform_properties,
+};
use crate::worker_capability_index::WorkerCapabilityIndex;
use crate::worker_registry::SharedWorkerRegistry;
use crate::worker_scheduler::WorkerScheduler;
@@ -96,11 +99,15 @@ impl WorkerSchedulerMetrics {
}
pub fn record_worker_removed(&self) {
- WORKER_POOL_METRICS.worker_events.add(1, self.attrs.removed());
+ WORKER_POOL_METRICS
+ .worker_events
+ .add(1, self.attrs.removed());
}
pub fn record_worker_timeout(&self) {
- WORKER_POOL_METRICS.worker_events.add(1, self.attrs.timeout());
+ WORKER_POOL_METRICS
+ .worker_events
+ .add(1, self.attrs.timeout());
}
pub fn record_worker_connection_failed(&self) {
@@ -385,7 +392,7 @@ impl ApiWorkerSchedulerImpl {
/// Batch finds workers for multiple actions in a single pass.
/// This reduces lock contention by acquiring the lock once for all actions.
- /// Returns a map of (action_index, worker_id) pairs for successful matches.
+ /// Returns a map of (`action_index`, `worker_id`) pairs for successful matches.
fn inner_batch_find_workers_for_actions(
&self,
actions: &[&PlatformProperties],
@@ -409,16 +416,23 @@ impl ApiWorkerSchedulerImpl {
}
if !workers_platform_properties.contains_key(&worker_id) {
- workers_platform_properties.insert(worker_id.clone(), worker.platform_properties.clone());
+ workers_platform_properties
+ .insert(worker_id.clone(), worker.platform_properties.clone());
}
- if !platform_properties.is_satisfied_by(&workers_platform_properties[&worker_id], full_worker_logging) {
+ if !platform_properties.is_satisfied_by(
+ &workers_platform_properties[&worker_id],
+ full_worker_logging,
+ ) {
continue;
}
- reduce_platform_properties(workers_platform_properties.get_mut(&worker_id).unwrap(), platform_properties);
+ reduce_platform_properties(
+ workers_platform_properties.get_mut(&worker_id).unwrap(),
+ platform_properties,
+ );
- results.insert(idx, worker_id.clone());
+ results.insert(idx, worker_id.clone());
break;
}
}
@@ -660,7 +674,10 @@ impl ApiWorkerSchedulerImpl {
}
fn count_running_actions(&self) -> usize {
- self.workers.iter().map(|(_, w)| w.running_action_infos.len()).sum()
+ self.workers
+ .iter()
+ .map(|(_, w)| w.running_action_infos.len())
+ .sum()
}
}
@@ -720,7 +737,7 @@ impl ApiWorkerScheduler {
/// Returns a reference to the worker scheduler metrics for recording OTEL metrics.
#[must_use]
- pub fn workerMetrics(&self) -> &WorkerSchedulerMetrics {
+ pub const fn workerMetrics(&self) -> &WorkerSchedulerMetrics {
&self.worker_scheduler_metrics
}
@@ -744,7 +761,8 @@ impl ApiWorkerScheduler {
} else {
self.worker_scheduler_metrics.record_dispatch_failure();
}
- self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
+ self.worker_scheduler_metrics
+ .record_running_actions_count(inner.count_running_actions());
result
}
@@ -764,7 +782,9 @@ impl ApiWorkerScheduler {
.fetch_add(count as u64, Ordering::Relaxed);
let mut inner = self.inner.write().await;
- let results = inner.inner_batch_worker_notify_run_action(assignments).await;
+ let results = inner
+ .inner_batch_worker_notify_run_action(assignments)
+ .await;
// Record metrics
let successes = results.iter().filter(|r| r.is_ok()).count();
@@ -832,7 +852,7 @@ impl ApiWorkerScheduler {
/// This reduces lock contention compared to calling `find_worker_for_action`
/// for each action individually.
///
- /// Returns a vector of (action_index, worker_id) pairs for successful matches.
+ /// Returns a vector of (`action_index`, `worker_id`) pairs for successful matches.
/// Actions that couldn't be matched to a worker are not included in the result.
pub async fn batch_find_workers_for_actions(
&self,
@@ -846,8 +866,7 @@ impl ApiWorkerScheduler {
let inner = self.inner.read().await;
let worker_count = inner.workers.len() as u64;
- let results =
- inner.inner_batch_find_workers_for_actions(actions, full_worker_logging);
+ let results = inner.inner_batch_find_workers_for_actions(actions, full_worker_logging);
// Track metrics
self.metrics
@@ -917,7 +936,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
if let Err(err) = &result {
- self.worker_scheduler_metrics.record_worker_connection_failed();
+ self.worker_scheduler_metrics
+ .record_worker_connection_failed();
return Result::<(), _>::Err(err.clone()).merge(
inner
.immediate_evict_worker(&worker_id, err.clone(), false)
@@ -930,7 +950,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
self.metrics.workers_added.fetch_add(1, Ordering::Relaxed);
self.worker_scheduler_metrics.record_worker_added();
- self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
+ self.worker_scheduler_metrics
+ .record_worker_count(inner.workers.len());
Ok(())
}
@@ -955,7 +976,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
if result.is_ok() && is_completion {
self.worker_scheduler_metrics.record_action_completed();
}
- self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
+ self.worker_scheduler_metrics
+ .record_running_actions_count(inner.count_running_actions());
result
}
@@ -992,7 +1014,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
// Record worker removal
self.worker_scheduler_metrics.record_worker_removed();
- self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
+ self.worker_scheduler_metrics
+ .record_worker_count(inner.workers.len());
result
}
@@ -1086,8 +1109,10 @@ impl WorkerScheduler for ApiWorkerScheduler {
self.worker_scheduler_metrics.record_worker_timeout();
}
- self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
- self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
+ self.worker_scheduler_metrics
+ .record_running_actions_count(inner.count_running_actions());
+ self.worker_scheduler_metrics
+ .record_worker_count(inner.workers.len());
result
}
@@ -1095,7 +1120,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
let mut inner = self.inner.write().await;
inner.set_drain_worker(worker_id, is_draining).await?;
- self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
+ self.worker_scheduler_metrics
+ .record_worker_count(inner.workers.len());
Ok(())
}
}
diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs
index c11321771..7f8816634 100644
--- a/nativelink-scheduler/src/cache_lookup_scheduler.rs
+++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs
@@ -382,7 +382,7 @@ impl ClientStateManager for CacheLookupScheduler {
self.action_scheduler.as_known_platform_property_provider()
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs
index 711e34f67..2228bf9ee 100644
--- a/nativelink-scheduler/src/default_scheduler_factory.rs
+++ b/nativelink-scheduler/src/default_scheduler_factory.rs
@@ -32,6 +32,7 @@ use crate::cache_lookup_scheduler::CacheLookupScheduler;
use crate::grpc_scheduler::GrpcScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
+use crate::property_router_scheduler::PropertyRouterScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::store_awaited_action_db::StoreAwaitedActionDb;
use crate::worker_scheduler::WorkerScheduler;
@@ -95,6 +96,38 @@ async fn inner_scheduler_factory(
));
(Some(property_modifier_scheduler), worker_scheduler)
}
+ SchedulerSpec::PropertyRouter(spec) => {
+ use std::collections::HashMap;
+ let mut routes = HashMap::with_capacity(spec.routes.len());
+ for (value, nested_spec) in &spec.routes {
+ let (action_scheduler, _) = Box::pin(inner_scheduler_factory(
+ nested_spec,
+ store_manager,
+ maybe_origin_event_tx,
+ ))
+ .await
+ .err_tip(|| format!("In nested PropertyRouterScheduler route '{value}'"))?;
+ routes.insert(
+ value.clone(),
+ action_scheduler
+ .err_tip(|| format!("Nested route '{value}' is not an action scheduler"))?,
+ );
+ }
+ let (default_action_scheduler, _) = Box::pin(inner_scheduler_factory(
+ &spec.default_scheduler,
+ store_manager,
+ maybe_origin_event_tx,
+ ))
+ .await
+ .err_tip(|| "In PropertyRouterScheduler default_scheduler")?;
+ let router = Arc::new(PropertyRouterScheduler::new(
+ &spec.property_name,
+ routes,
+ default_action_scheduler
+ .err_tip(|| "Default scheduler is not an action scheduler")?,
+ ));
+ (Some(router), None)
+ }
};
Ok(scheduler)
diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs
index f4de0b0d3..65de39dce 100644
--- a/nativelink-scheduler/src/grpc_scheduler.rs
+++ b/nativelink-scheduler/src/grpc_scheduler.rs
@@ -355,7 +355,7 @@ impl ClientStateManager for GrpcScheduler {
Some(self)
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/lib.rs b/nativelink-scheduler/src/lib.rs
index b5d38cb13..cc11ffe27 100644
--- a/nativelink-scheduler/src/lib.rs
+++ b/nativelink-scheduler/src/lib.rs
@@ -21,6 +21,7 @@ pub mod memory_awaited_action_db;
pub mod mock_scheduler;
pub mod platform_property_manager;
pub mod property_modifier_scheduler;
+pub mod property_router_scheduler;
pub mod simple_scheduler;
pub mod simple_scheduler_state_manager;
pub mod store_awaited_action_db;
diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs
index faae5f8e5..024eb1ca2 100644
--- a/nativelink-scheduler/src/memory_awaited_action_db.rs
+++ b/nativelink-scheduler/src/memory_awaited_action_db.rs
@@ -680,24 +680,20 @@ impl I + Clone + Send + Sync> AwaitedActionDbI
// Record completion metrics with action digest for failure tracking
let action_digest = old_awaited_action.action_info().digest().to_string();
if let ActionStage::Completed(action_result) = new_stage {
- let result_attrs = vec![
- opentelemetry::KeyValue::new(
- nativelink_util::metrics::EXECUTION_RESULT,
- if action_result.exit_code == 0 {
- ExecutionResult::Success
- } else {
- ExecutionResult::Failure
- },
- ),
- ];
+ let result_attrs = vec![opentelemetry::KeyValue::new(
+ nativelink_util::metrics::EXECUTION_RESULT,
+ if action_result.exit_code == 0 {
+ ExecutionResult::Success
+ } else {
+ ExecutionResult::Failure
+ },
+ )];
metrics.execution_completed_count.add(1, &result_attrs);
} else if let ActionStage::CompletedFromCache(_) = new_stage {
- let result_attrs = vec![
- opentelemetry::KeyValue::new(
- nativelink_util::metrics::EXECUTION_RESULT,
- ExecutionResult::CacheHit,
- ),
- ];
+ let result_attrs = vec![opentelemetry::KeyValue::new(
+ nativelink_util::metrics::EXECUTION_RESULT,
+ ExecutionResult::CacheHit,
+ )];
metrics.execution_completed_count.add(1, &result_attrs);
}
diff --git a/nativelink-scheduler/src/mock_scheduler.rs b/nativelink-scheduler/src/mock_scheduler.rs
index ff9ab9f6d..f301a4553 100644
--- a/nativelink-scheduler/src/mock_scheduler.rs
+++ b/nativelink-scheduler/src/mock_scheduler.rs
@@ -193,7 +193,7 @@ impl ClientStateManager for MockActionScheduler {
Some(self)
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/property_modifier_scheduler.rs b/nativelink-scheduler/src/property_modifier_scheduler.rs
index 5343ecb0a..b273e4836 100644
--- a/nativelink-scheduler/src/property_modifier_scheduler.rs
+++ b/nativelink-scheduler/src/property_modifier_scheduler.rs
@@ -169,7 +169,7 @@ impl ClientStateManager for PropertyModifierScheduler {
Some(self)
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/property_router_scheduler.rs b/nativelink-scheduler/src/property_router_scheduler.rs
new file mode 100644
index 000000000..ba56f1142
--- /dev/null
+++ b/nativelink-scheduler/src/property_router_scheduler.rs
@@ -0,0 +1,129 @@
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use nativelink_error::{Error, ResultExt};
+use nativelink_metric::{MetricsComponent, RootMetricsComponent};
+use nativelink_util::action_messages::{ActionInfo, OperationId};
+use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
+use nativelink_util::operation_state_manager::{
+ ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
+};
+
+#[derive(MetricsComponent)]
+pub struct PropertyRouterScheduler {
+ property_name: String,
+ #[metric(group = "routes")]
+ routes: HashMap>,
+ #[metric(group = "default_scheduler")]
+ default_scheduler: Arc,
+}
+
+impl core::fmt::Debug for PropertyRouterScheduler {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ f.debug_struct("PropertyRouterScheduler")
+ .field("property_name", &self.property_name)
+ .finish_non_exhaustive()
+ }
+}
+
+impl PropertyRouterScheduler {
+ pub fn new(
+ property_name: &str,
+ routes: HashMap>,
+ default_scheduler: Arc,
+ ) -> Self {
+ Self {
+ property_name: property_name.to_string(),
+ routes,
+ default_scheduler,
+ }
+ }
+
+ async fn inner_add_action(
+ &self,
+ client_operation_id: OperationId,
+ action_info: Arc,
+ ) -> Result, Error> {
+ let scheduler = action_info
+ .platform_properties
+ .get(&self.property_name)
+ .and_then(|value| self.routes.get(value))
+ .unwrap_or(&self.default_scheduler);
+
+ scheduler.add_action(client_operation_id, action_info).await
+ }
+
+ async fn inner_filter_operations(
+ &self,
+ filter: OperationFilter,
+ ) -> Result, Error> {
+ let mut streams = Vec::with_capacity(self.routes.len() + 1);
+ for scheduler in self.routes.values() {
+ streams.push(scheduler.filter_operations(filter.clone()).await?);
+ }
+ streams.push(self.default_scheduler.filter_operations(filter).await?);
+ Ok(Box::pin(futures::stream::select_all(streams)))
+ }
+
+ async fn inner_get_known_properties(&self, instance_name: &str) -> Result, Error> {
+ let mut all_props = HashSet::new();
+ for scheduler in self.routes.values() {
+ if let Some(p) = scheduler.as_known_platform_property_provider() {
+ for prop in p
+ .get_known_properties(instance_name)
+ .await
+ .err_tip(|| "In PropertyRouterScheduler::get_known_properties for route")?
+ {
+ all_props.insert(prop);
+ }
+ }
+ }
+ if let Some(p) = self.default_scheduler.as_known_platform_property_provider() {
+ for prop in p
+ .get_known_properties(instance_name)
+ .await
+ .err_tip(|| "In PropertyRouterScheduler::get_known_properties for default")?
+ {
+ all_props.insert(prop);
+ }
+ }
+ Ok(all_props.into_iter().collect())
+ }
+}
+
+#[async_trait]
+impl KnownPlatformPropertyProvider for PropertyRouterScheduler {
+ async fn get_known_properties(&self, instance_name: &str) -> Result, Error> {
+ self.inner_get_known_properties(instance_name).await
+ }
+}
+
+#[async_trait]
+impl ClientStateManager for PropertyRouterScheduler {
+ async fn add_action(
+ &self,
+ client_operation_id: OperationId,
+ action_info: Arc,
+ ) -> Result, Error> {
+ self.inner_add_action(client_operation_id, action_info)
+ .await
+ }
+
+ async fn filter_operations<'a>(
+ &'a self,
+ filter: OperationFilter,
+ ) -> Result, Error> {
+ self.inner_filter_operations(filter).await
+ }
+
+ fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
+ Some(self)
+ }
+
+ fn as_any(&self) -> &dyn core::any::Any {
+ self
+ }
+}
+
+impl RootMetricsComponent for PropertyRouterScheduler {}
diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs
index 4d0e91b36..79a264d5a 100644
--- a/nativelink-scheduler/src/simple_scheduler.rs
+++ b/nativelink-scheduler/src/simple_scheduler.rs
@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::{Instant, SystemTime};
use async_trait::async_trait;
-use futures::{future, Future, StreamExt};
+use futures::{Future, StreamExt, future};
use nativelink_config::schedulers::SimpleSpec;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
@@ -25,6 +25,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEven
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
+use nativelink_util::metrics::EXECUTION_METRICS;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
@@ -34,11 +35,12 @@ use nativelink_util::platform_properties::PlatformProperties;
use nativelink_util::shutdown_guard::ShutdownGuard;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
+use opentelemetry::KeyValue;
use opentelemetry::baggage::BaggageExt;
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
-use opentelemetry::KeyValue;
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
-use tokio::sync::{mpsc, Notify};
+use serde::Serialize;
+use tokio::sync::{Notify, mpsc};
use tokio::time::Duration;
use tracing::{debug, error, info, info_span, warn};
@@ -49,8 +51,6 @@ use crate::simple_scheduler_state_manager::{SchedulerStateManager, SimpleSchedul
use crate::worker::{ActionInfoWithProps, ActionsState, Worker, WorkerState, WorkerTimestamp};
use crate::worker_registry::WorkerRegistry;
use crate::worker_scheduler::WorkerScheduler;
-use serde::Serialize;
-use nativelink_util::metrics::EXECUTION_METRICS;
/// Default timeout for workers in seconds.
/// If this changes, remember to change the documentation in the config.
@@ -360,7 +360,9 @@ impl SimpleScheduler {
}
let total_elapsed = start.elapsed();
- EXECUTION_METRICS.do_try_match_duration.record(total_elapsed.as_secs_f64(), &[]);
+ EXECUTION_METRICS
+ .do_try_match_duration
+ .record(total_elapsed.as_secs_f64(), &[]);
if total_elapsed > Duration::from_secs(5) {
warn!(
total_ms = total_elapsed.as_millis(),
@@ -406,20 +408,20 @@ impl SimpleScheduler {
origin_metadata: OriginMetadata,
}
- let mut prepared_actions: Vec = Vec::with_capacity(action_state_results.len());
- let mut platform_properties_refs: Vec<&PlatformProperties> = Vec::with_capacity(action_state_results.len());
+ let mut prepared_actions: Vec =
+ Vec::with_capacity(action_state_results.len());
+ let mut platform_properties_refs: Vec<&PlatformProperties> =
+ Vec::with_capacity(action_state_results.len());
for action_state_result in action_state_results {
- let (action_info, maybe_origin_metadata) = match action_state_result
- .as_action_info()
- .await
- {
- Ok(result) => result,
- Err(err) => {
- warn!(?err, "Failed to get action_info in batch mode, skipping");
- continue;
- }
- };
+ let (action_info, maybe_origin_metadata) =
+ match action_state_result.as_action_info().await {
+ Ok(result) => result,
+ Err(err) => {
+ warn!(?err, "Failed to get action_info in batch mode, skipping");
+ continue;
+ }
+ };
// TODO(palfrey) We should not compute this every time and instead store
// it with the ActionInfo when we receive it.
@@ -429,7 +431,10 @@ impl SimpleScheduler {
{
Ok(props) => props,
Err(err) => {
- warn!(?err, "Failed to make platform properties in batch mode, skipping");
+ warn!(
+ ?err,
+ "Failed to make platform properties in batch mode, skipping"
+ );
continue;
}
};
@@ -517,10 +522,9 @@ impl SimpleScheduler {
// Merge notification results
for notify_result in notify_results {
- result = result.merge(
- notify_result
- .err_tip(|| "Failed to run batch_worker_notify_run_action in do_try_match_batch"),
- );
+ result = result.merge(notify_result.err_tip(
+ || "Failed to run batch_worker_notify_run_action in do_try_match_batch",
+ ));
}
}
@@ -719,7 +723,8 @@ impl SimpleScheduler {
}
};
- let res = scheduler.do_try_match_internal(full_worker_logging).await;
+ let res =
+ scheduler.do_try_match_internal(full_worker_logging).await;
if full_worker_logging {
let operations_stream = scheduler
.matching_engine_state_manager
@@ -854,7 +859,7 @@ impl ClientStateManager for SimpleScheduler {
Some(self)
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs
index 2502ed9f4..41b188f61 100644
--- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs
+++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs
@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::awaited_action_db::{
- AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CountableActionStage,
- SortedAwaitedActionState,
-};
-use async_lock::Mutex;
-use async_trait::async_trait;
use core::ops::Bound;
use core::time::Duration;
-use futures::{stream, StreamExt, TryStreamExt};
-use nativelink_error::{make_err, Code, Error, ResultExt};
+use std::collections::{BTreeMap, HashMap};
+use std::string::ToString;
+use std::sync::{Arc, Weak};
+use std::{env, vec};
+
+use async_lock::Mutex;
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt, stream};
+use nativelink_error::{Code, Error, ResultExt, make_err};
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{
ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
@@ -30,8 +31,8 @@ use nativelink_util::action_messages::{
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::metrics::{
- register_queued_actions_callback, ExecutionMetricAttrs, ExecutionResult, EXECUTION_INSTANCE,
- EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionStage,
+ EXECUTION_INSTANCE, EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionMetricAttrs,
+ ExecutionResult, ExecutionStage, register_queued_actions_callback,
};
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
@@ -41,11 +42,11 @@ use nativelink_util::origin_event::OriginMetadata;
use opentelemetry::KeyValue;
use tracing::{debug, info, trace, warn};
+use super::awaited_action_db::{
+ AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CountableActionStage,
+ SortedAwaitedActionState,
+};
use crate::worker_registry::SharedWorkerRegistry;
-use std::collections::{BTreeMap, HashMap};
-use std::string::ToString;
-use std::sync::{Arc, Weak};
-use std::{env, vec};
/// Maximum number of times an update to the database
/// can fail before giving up.
@@ -164,7 +165,7 @@ impl SchedulerMetrics {
}
#[must_use]
- pub fn result_from_stage(stage: &ActionStage) -> Option {
+ pub const fn result_from_stage(stage: &ActionStage) -> Option {
match stage {
ActionStage::Completed(result) => {
if result.error.is_some() {
@@ -572,7 +573,7 @@ where
/// Returns a reference to the scheduler metrics for recording OTEL metrics.
#[must_use]
- pub fn metrics(&self) -> &SchedulerMetrics {
+ pub const fn metrics(&self) -> &SchedulerMetrics {
&self.scheduler_metrics
}
@@ -587,7 +588,7 @@ where
action_insert_timestamp: std::time::SystemTime,
) {
// Only record if the stage actually changed
- if std::mem::discriminant(previous_stage) != std::mem::discriminant(new_stage) {
+ if core::mem::discriminant(previous_stage) != core::mem::discriminant(new_stage) {
self.record_actions_count().await;
// Record the stage transition
self.scheduler_metrics
@@ -655,7 +656,7 @@ where
EXECUTION_METRICS
.execution_total_duration
- .record(total_execution_duration.as_secs_f64(), &[])
+ .record(total_execution_duration.as_secs_f64(), &[]);
}
}
}
@@ -1191,10 +1192,8 @@ where
let priority = Some(awaited_action.action_info().priority);
// Build base attributes for metrics
- let mut attrs = nativelink_util::metrics::make_execution_attributes(
- instance_name,
- priority,
- );
+ let mut attrs =
+ nativelink_util::metrics::make_execution_attributes(instance_name, priority);
// Add stage attribute
let execution_stage: ExecutionStage = (&action_state.stage).into();
@@ -1235,7 +1234,7 @@ where
is_retry,
action_insert_timestamp,
)
- .await;
+ .await;
return Ok(());
}
@@ -1274,7 +1273,7 @@ where
if result.is_ok() {
self.scheduler_metrics
.record_stage_transition(None, ActionStage::Queued);
- self.record_actions_count().await
+ self.record_actions_count().await;
}
result
@@ -1508,7 +1507,7 @@ where
None
}
- fn as_any(&self) -> &dyn std::any::Any {
+ fn as_any(&self) -> &dyn core::any::Any {
self
}
}
diff --git a/nativelink-scheduler/src/worker.rs b/nativelink-scheduler/src/worker.rs
index 55f25a538..b20ac8819 100644
--- a/nativelink-scheduler/src/worker.rs
+++ b/nativelink-scheduler/src/worker.rs
@@ -306,7 +306,11 @@ impl Worker {
WorkerState {
id: self.id.clone(),
platform_properties: self.platform_properties.clone(),
- running_action_infos: self.running_action_infos.iter().map(|(k, v)| (k.to_string(), v.clone())).collect(),
+ running_action_infos: self
+ .running_action_infos
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.clone()))
+ .collect(),
last_update_timestamp: self.last_update_timestamp,
is_paused: self.is_paused,
is_draining: self.is_draining,
diff --git a/nativelink-scheduler/tests/property_router_scheduler_test.rs b/nativelink-scheduler/tests/property_router_scheduler_test.rs
new file mode 100644
index 000000000..1bb1eb504
--- /dev/null
+++ b/nativelink-scheduler/tests/property_router_scheduler_test.rs
@@ -0,0 +1,288 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+mod utils {
+ pub(crate) mod scheduler_utils;
+}
+
+use futures::{StreamExt, join};
+use nativelink_error::{Error, make_input_err};
+use nativelink_macro::nativelink_test;
+use nativelink_scheduler::mock_scheduler::MockActionScheduler;
+use nativelink_scheduler::property_router_scheduler::PropertyRouterScheduler;
+use nativelink_util::action_messages::{ActionStage, ActionState, OperationId};
+use nativelink_util::common::DigestInfo;
+use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
+use nativelink_util::operation_state_manager::{ClientStateManager, OperationFilter};
+use pretty_assertions::assert_eq;
+use tokio::sync::watch;
+use utils::scheduler_utils::{TokioWatchActionStateResult, make_base_action_info};
+
+struct TestContext {
+ compile_scheduler: Arc,
+ default_scheduler: Arc,
+ router: PropertyRouterScheduler,
+}
+
+fn make_router() -> TestContext {
+ let compile_scheduler = Arc::new(MockActionScheduler::new());
+ let default_scheduler = Arc::new(MockActionScheduler::new());
+ let mut routes = HashMap::new();
+ routes.insert(
+ "compile".to_string(),
+ compile_scheduler.clone()
+ as Arc,
+ );
+ let router = PropertyRouterScheduler::new(
+ "container-image",
+ routes,
+ default_scheduler.clone() as Arc,
+ );
+ TestContext {
+ compile_scheduler,
+ default_scheduler,
+ router,
+ }
+}
+
+#[nativelink_test]
+async fn routes_to_matching_scheduler() -> Result<(), Error> {
+ let ctx = make_router();
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info
+ .platform_properties
+ .insert("container-image".to_string(), "compile".to_string());
+ let action_info = Arc::new(action_info);
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) =
+ join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.compile_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert_eq!(
+ Some(&"compile".to_string()),
+ received_action.platform_properties.get("container-image")
+ );
+ Ok(())
+}
+
+#[nativelink_test]
+async fn routes_to_default_when_no_match() -> Result<(), Error> {
+ let ctx = make_router();
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info.platform_properties.insert(
+ "container-image".to_string(),
+ "some-other-image".to_string(),
+ );
+ let action_info = Arc::new(action_info);
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) =
+ join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.default_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert_eq!(
+ Some(&"some-other-image".to_string()),
+ received_action.platform_properties.get("container-image")
+ );
+ Ok(())
+}
+
+#[nativelink_test]
+async fn routes_to_default_when_property_missing() -> Result<(), Error> {
+ let ctx = make_router();
+ let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest());
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) =
+ join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.default_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert!(
+ !received_action
+ .platform_properties
+ .contains_key("container-image"),
+ "Expected no container-image property"
+ );
+ Ok(())
+}
+
+#[nativelink_test]
+async fn routes_multiple_values() -> Result<(), Error> {
+ let ctx = make_router();
+
+ // First action: routes to compile_scheduler
+ {
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info
+ .platform_properties
+ .insert("container-image".to_string(), "compile".to_string());
+ let action_info = Arc::new(action_info);
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) = join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.compile_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert_eq!(
+ Some(&"compile".to_string()),
+ received_action.platform_properties.get("container-image")
+ );
+ }
+
+ // Second action: routes to default_scheduler
+ {
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info
+ .platform_properties
+ .insert("container-image".to_string(), "default-image".to_string());
+ let action_info = Arc::new(action_info);
+
+ let (_tx, rx) = watch::channel(Arc::new(ActionState {
+ client_operation_id: OperationId::default(),
+ stage: ActionStage::Queued,
+ action_digest: action_info.unique_qualifier.digest(),
+ last_transition_timestamp: SystemTime::now(),
+ }));
+ let client_operation_id = OperationId::default();
+
+ let (_, (received_op_id, received_action)) = join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.default_scheduler.expect_add_action(Ok(Box::new(
+ TokioWatchActionStateResult::new(client_operation_id.clone(), action_info, rx)
+ ))),
+ );
+ assert_eq!(client_operation_id, received_op_id);
+ assert_eq!(
+ Some(&"default-image".to_string()),
+ received_action.platform_properties.get("container-image")
+ );
+ }
+
+ Ok(())
+}
+
+#[nativelink_test]
+async fn filter_operations_fans_out_to_all() -> Result<(), Error> {
+ let ctx = make_router();
+ let filter = OperationFilter {
+ client_operation_id: Some(OperationId::default()),
+ ..Default::default()
+ };
+
+ // The router calls filter_operations sequentially on routes then default.
+ // Since HashMap order is arbitrary, we join both expects concurrently.
+ let (router_result, compile_filter, default_filter) = join!(
+ ctx.router.filter_operations(filter.clone()),
+ ctx.compile_scheduler
+ .expect_filter_operations(Ok(Box::pin(futures::stream::empty()))),
+ ctx.default_scheduler
+ .expect_filter_operations(Ok(Box::pin(futures::stream::empty()))),
+ );
+
+ assert!(router_result.unwrap().next().await.is_none());
+ assert_eq!(filter, compile_filter);
+ assert_eq!(filter, default_filter);
+ Ok(())
+}
+
+#[nativelink_test]
+async fn known_properties_unions_all_schedulers() -> Result<(), Error> {
+ let ctx = make_router();
+
+ let (known_props, _compile_instance, _default_instance) = join!(
+ ctx.router.get_known_properties("my-instance"),
+ ctx.compile_scheduler
+ .expect_get_known_properties(Ok(vec!["cpu_arch".to_string()])),
+ ctx.default_scheduler
+ .expect_get_known_properties(Ok(vec!["os".to_string(), "cpu_arch".to_string()])),
+ );
+
+ let mut props = known_props.unwrap();
+ props.sort();
+ assert_eq!(vec!["cpu_arch".to_string(), "os".to_string()], props);
+ Ok(())
+}
+
+#[nativelink_test]
+async fn error_from_nested_scheduler_propagates() -> Result<(), Error> {
+ let ctx = make_router();
+ let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest())
+ .as_ref()
+ .clone();
+ action_info
+ .platform_properties
+ .insert("container-image".to_string(), "compile".to_string());
+ let action_info = Arc::new(action_info);
+
+ let client_operation_id = OperationId::default();
+ let (result, _) = join!(
+ ctx.router
+ .add_action(client_operation_id.clone(), action_info.clone()),
+ ctx.compile_scheduler
+ .expect_add_action(Err(make_input_err!("Simulated scheduler error"))),
+ );
+
+ assert!(
+ result.is_err(),
+ "Expected error to propagate from nested scheduler"
+ );
+ Ok(())
+}
diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs
index 706206c74..68979333a 100644
--- a/nativelink-service/src/execution_server.rs
+++ b/nativelink-service/src/execution_server.rs
@@ -378,7 +378,7 @@ impl Execution for ExecutionServer {
#[cfg(test)]
#[test]
fn test_nl_op_id_from_name() -> Result<(), Box> {
- let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")];
+ let examples = [("foo/bar", "foo"), ("a/b/c/d", "a")];
for (input, expected) in examples {
let id = NativelinkOperationId::from_name(input)?;
diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs
index 63e161891..ca0cc6bab 100644
--- a/nativelink-store/src/default_store_factory.rs
+++ b/nativelink-store/src/default_store_factory.rs
@@ -24,6 +24,7 @@ use nativelink_error::Error;
use nativelink_util::health_utils::HealthRegistryBuilder;
use nativelink_util::metrics::StoreType;
use nativelink_util::store_trait::{Store, StoreDriver};
+
use crate::completeness_checking_store::CompletenessCheckingStore;
use crate::compression_store::CompressionStore;
use crate::dedup_store::DedupStore;
@@ -140,13 +141,13 @@ pub fn store_factory<'a>(
)))
} else {
Ok(store)
- }
+ };
})
}
fn should_wrap_in_metrics_store(spec: &StoreSpec) -> bool {
if env::var("NL_STORE_METRICS").is_err() {
- return false
+ return false;
}
matches!(
diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs
index b29346ac9..d5df21055 100644
--- a/nativelink-store/src/fast_slow_store.rs
+++ b/nativelink-store/src/fast_slow_store.rs
@@ -30,6 +30,7 @@ use nativelink_util::buf_channel::{
};
use nativelink_util::fs;
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
+use nativelink_util::metrics::FAST_SLOW_STORE_METRICS;
use nativelink_util::store_trait::{
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
UploadSizeInfo, slow_update_store_with_file,
@@ -37,7 +38,6 @@ use nativelink_util::store_trait::{
use parking_lot::Mutex;
use tokio::sync::OnceCell;
use tracing::{debug, trace, warn};
-use nativelink_util::metrics::FAST_SLOW_STORE_METRICS;
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
// there are many copies happening internally.
diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs
index 4a367ee33..fce478231 100644
--- a/nativelink-store/src/lib.rs
+++ b/nativelink-store/src/lib.rs
@@ -27,6 +27,7 @@ pub mod gcs_client;
pub mod gcs_store;
pub mod grpc_store;
pub mod memory_store;
+pub mod metrics_store;
pub mod mongo_store;
pub mod noop_store;
pub mod ontap_s3_existence_cache_store;
@@ -39,4 +40,3 @@ pub mod shard_store;
pub mod size_partitioning_store;
pub mod store_manager;
pub mod verify_store;
-pub mod metrics_store;
diff --git a/nativelink-store/src/metrics_store.rs b/nativelink-store/src/metrics_store.rs
index fe3a363f1..8abaa67a5 100644
--- a/nativelink-store/src/metrics_store.rs
+++ b/nativelink-store/src/metrics_store.rs
@@ -1,17 +1,19 @@
-use crate::filesystem_store::FilesystemStore;
+use core::pin::Pin;
+use std::borrow::Cow;
+use std::sync::Arc;
+use std::time::Instant;
+
use async_trait::async_trait;
use nativelink_error::Error;
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
-use nativelink_util::metrics::{StoreMetricAttrs, StoreType, STORE_METRICS};
+use nativelink_util::metrics::{STORE_METRICS, StoreMetricAttrs, StoreType};
use nativelink_util::store_trait::{
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
};
-use std::borrow::Cow;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Instant;
+
+use crate::filesystem_store::FilesystemStore;
#[derive(MetricsComponent, Debug)]
pub struct MetricsStore {
@@ -42,7 +44,9 @@ impl MetricsStore {
tracing::error!("Failed to register remove callback: {:?}", e);
}
- STORE_METRICS.store_size.record(fs_store.get_len(), &attrs.store_size());
+ STORE_METRICS
+ .store_size
+ .record(fs_store.get_len(), &attrs.store_size());
}
Arc::new(Self {
@@ -109,7 +113,9 @@ impl StoreDriver for MetricsStore {
}
if let Some(fs_store) = self.inner.downcast_ref::(None) {
- STORE_METRICS.store_size.record(fs_store.get_len(), &self.attrs.store_size());
+ STORE_METRICS
+ .store_size
+ .record(fs_store.get_len(), &self.attrs.store_size());
}
result
diff --git a/nativelink-store/src/mongo_store.rs b/nativelink-store/src/mongo_store.rs
index 2110a20b7..d68500a17 100644
--- a/nativelink-store/src/mongo_store.rs
+++ b/nativelink-store/src/mongo_store.rs
@@ -1101,7 +1101,7 @@ impl SchedulerStore for ExperimentalMongoStore {
async fn count_by_index(&self, index: Vec) -> Result, Error>
where
- K: SchedulerIndexProvider + Send
+ K: SchedulerIndexProvider + Send,
{
Ok(vec![0; index.len()])
}
diff --git a/nativelink-util/src/metrics.rs b/nativelink-util/src/metrics.rs
index 82537194e..faf9d6b0d 100644
--- a/nativelink-util/src/metrics.rs
+++ b/nativelink-util/src/metrics.rs
@@ -14,11 +14,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::fmt::{Display, Formatter};
+use core::fmt::{Display, Formatter};
use std::sync::{LazyLock, OnceLock};
+use opentelemetry::{InstrumentationScope, KeyValue, Value, global, metrics};
+
use crate::action_messages::ActionStage;
-use opentelemetry::{global, metrics, InstrumentationScope, KeyValue, Value};
/// Callback type for observable gauges that report queued action counts.
/// The callback receives an `Observer` that should be used to record values with attributes.
@@ -700,16 +701,13 @@ pub struct ExecutionMetrics {
pub execution_actions_count: metrics::Gauge,
// Gauge of queued actions by platform properties
pub execution_queued_actions_count: metrics::ObservableGauge,
- /// Duration of do_try_match in ms
+ /// Duration of `do_try_match` in ms
pub do_try_match_duration: metrics::Histogram,
}
/// Helper function to create attributes for execution metrics
#[must_use]
-pub fn make_execution_attributes(
- instance_name: &str,
- priority: Option,
-) -> Vec {
+pub fn make_execution_attributes(instance_name: &str, priority: Option) -> Vec {
let mut attrs = vec![KeyValue::new(EXECUTION_INSTANCE, instance_name.to_string())];
if let Some(priority) = priority {
@@ -1601,7 +1599,7 @@ pub enum StoreType {
}
impl Display for StoreType {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
StoreType::Filesystem => write!(f, "filesystem"),
StoreType::S3 => write!(f, "s3"),
@@ -1644,7 +1642,7 @@ pub static STORE_METRICS: LazyLock = LazyLock::new(|| {
// memory, a filesystem, or network storage. The current values were
// determined empirically and might need adjustment.
.with_boundaries(vec![
- 0.1, // 100μs
+ 0.1, // 100μs
// Sub-millisecond range
0.5, // 500μs
1.0, // 1ms
@@ -1654,9 +1652,9 @@ pub static STORE_METRICS: LazyLock = LazyLock::new(|| {
50.0, // 50ms
100.0, // 100ms
// Higher latency range
- 500.0, // 500ms
- 1000.0, // 1 second
- 5000.0, // 5 seconds
+ 500.0, // 500ms
+ 1000.0, // 1 second
+ 5000.0, // 5 seconds
10000.0, // 10 seconds
])
.build(),
@@ -1696,7 +1694,6 @@ pub struct StoreMetricAttrs {
write_error: Vec,
eviction: Vec,
store_size: Vec,
-
}
impl StoreMetricAttrs {
@@ -1727,7 +1724,6 @@ impl StoreMetricAttrs {
write_error: make_attrs(CacheOperationName::Write, CacheOperationResult::Error),
eviction: make_attrs(CacheOperationName::Evict, CacheOperationResult::Success),
store_size: base_attrs.clone(),
-
}
}
diff --git a/nativelink-util/src/operation_state_manager.rs b/nativelink-util/src/operation_state_manager.rs
index 3a4b8806e..38ef051b8 100644
--- a/nativelink-util/src/operation_state_manager.rs
+++ b/nativelink-util/src/operation_state_manager.rs
@@ -122,7 +122,7 @@ pub trait ClientStateManager: Sync + Send + Unpin + MetricsComponent + 'static {
fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider>;
/// Returns the implementation as `Any` so that it can be downcast to a concrete type.
- fn as_any(&self) -> &dyn std::any::Any;
+ fn as_any(&self) -> &dyn core::any::Any;
}
/// The type of update to perform on an operation.
diff --git a/nativelink-util/src/telemetry.rs b/nativelink-util/src/telemetry.rs
index 18606ce1e..b7c968b7b 100644
--- a/nativelink-util/src/telemetry.rs
+++ b/nativelink-util/src/telemetry.rs
@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use core::default::Default;
+use std::env;
+use std::sync::OnceLock;
+
use base64::Engine;
use base64::prelude::BASE64_STANDARD_NO_PAD;
-use core::default::Default;
use ginepro::LoadBalancedChannel;
use hyper::http::Response;
use nativelink_error::{Code, ResultExt, make_err};
@@ -34,8 +37,6 @@ use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
use prost::Message;
-use std::env;
-use std::sync::OnceLock;
use tracing::debug;
use tracing::metadata::LevelFilter;
use tracing_opentelemetry::{MetricsLayer, layer};
@@ -209,9 +210,11 @@ const NL_OTEL_ENDPOINT: &str = "NL_OTEL_ENDPOINT";
async fn maybe_load_balanced_channel() -> Option {
match env::var(NL_OTEL_ENDPOINT) {
Ok(endpoint) => {
- let url = Url::parse(endpoint.as_str()).map_err(|e| {
- make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}")
- }).unwrap();
+ let url = Url::parse(endpoint.as_str())
+ .map_err(|e| {
+ make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}")
+ })
+ .unwrap();
let host = url
.host()
diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs
index 1f7768830..c01ac9f49 100644
--- a/nativelink-worker/src/local_worker.rs
+++ b/nativelink-worker/src/local_worker.rs
@@ -23,10 +23,13 @@ use std::env;
use std::process::Stdio;
use std::sync::{Arc, Weak};
use std::time::Instant;
+
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
-use nativelink_config::cas_server::{EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig};
+use nativelink_config::cas_server::{
+ EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig,
+};
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
@@ -38,6 +41,7 @@ use nativelink_store::fast_slow_store::FastSlowStore;
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
use nativelink_util::common::fs;
use nativelink_util::digest_hasher::DigestHasherFunc;
+use nativelink_util::metrics::{LOCAL_WORKER_METRICS, WorkerMetricAttrs};
use nativelink_util::shutdown_guard::ShutdownGuard;
use nativelink_util::store_trait::Store;
use nativelink_util::{spawn, tls_utils};
@@ -49,7 +53,7 @@ use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::Streaming;
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
-use nativelink_util::metrics::{WorkerMetricAttrs, LOCAL_WORKER_METRICS};
+
use crate::running_actions_manager::{
ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
@@ -866,21 +870,21 @@ impl Metrics {
}
}
- /// Increment the start_actions_received counter
+ /// Increment the `start_actions_received` counter
pub fn inc_start_actions_received(&self) {
LOCAL_WORKER_METRICS
.start_actions_received
.add(1, self.attrs.base());
}
- /// Increment the disconnects_received counter
+ /// Increment the `disconnects_received` counter
pub fn inc_disconnects_received(&self) {
LOCAL_WORKER_METRICS
.disconnects_received
.add(1, self.attrs.base());
}
- /// Increment the keep_alives_received counter
+ /// Increment the `keep_alives_received` counter
pub fn inc_keep_alives_received(&self) {
LOCAL_WORKER_METRICS
.keep_alives_received
diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs
index 033e9e113..aedfb4d3c 100644
--- a/nativelink-worker/src/running_actions_manager.rs
+++ b/nativelink-worker/src/running_actions_manager.rs
@@ -1529,8 +1529,7 @@ impl RunningAction for RunningActionImpl {
"upload_results: starting with timeout",
);
let metrics = self.metrics().clone();
- let upload_fut = metrics
- .wrap_upload_results(Self::inner_upload_results(self));
+ let upload_fut = metrics.wrap_upload_results(Self::inner_upload_results(self));
let stall_warn_fut = async {
let mut elapsed_secs = 0u64;
@@ -2388,7 +2387,7 @@ impl MetricsTimer {
impl Metrics {
/// Create a new Metrics instance with optional attributes.
- pub fn new() -> Self {
+ pub const fn new() -> Self {
Self { attrs: Vec::new() }
}
diff --git a/nativelink-worker/tests/utils/local_worker_test_utils.rs b/nativelink-worker/tests/utils/local_worker_test_utils.rs
index adbef171e..71ea38052 100644
--- a/nativelink-worker/tests/utils/local_worker_test_utils.rs
+++ b/nativelink-worker/tests/utils/local_worker_test_utils.rs
@@ -144,10 +144,7 @@ impl MockWorkerApiClient {
req
}
- pub(crate) async fn expect_going_away(
- &self,
- result: Result<(), Error>,
- ) -> GoingAwayRequest {
+ pub(crate) async fn expect_going_away(&self, result: Result<(), Error>) -> GoingAwayRequest {
let mut rx_call_lock = self.rx_call.lock().await;
let req = match rx_call_lock
.recv()
@@ -186,10 +183,10 @@ impl WorkerApiClientTrait for MockWorkerApiClient {
WorkerClientApiReturns::ConnectWorker(result) => result,
resp @ WorkerClientApiReturns::ExecutionResponse(_) => {
panic!("connect_worker expected ConnectWorker response, received {resp:?}")
- },
+ }
resp @ WorkerClientApiReturns::GoingAway(_) => {
panic!("connect_worker expected ConnectWorker response, received {resp:?}")
- },
+ }
}
}
@@ -271,7 +268,9 @@ pub(crate) async fn setup_local_worker_with_config(
let (shutdown_tx_test, _) = broadcast::channel::(BROADCAST_CAPACITY);
let drop_guard = spawn!("local_worker_spawn", async move {
- worker.run(shutdown_tx_test.clone(), shutdown_tx_test.subscribe()).await
+ worker
+ .run(shutdown_tx_test.clone(), shutdown_tx_test.subscribe())
+ .await
});
let (tx_stream, streaming_response) = setup_grpc_stream();
diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs
index c9578626c..49ebf8cfd 100644
--- a/src/bin/nativelink.rs
+++ b/src/bin/nativelink.rs
@@ -196,9 +196,14 @@ async fn inner_main(
let health_component_name = format!("stores/{name}");
let mut health_register_store =
health_registry_lock.sub_builder(&health_component_name);
- let store = store_factory(&name, &spec, &store_manager, Some(&mut health_register_store))
- .await
- .err_tip(|| format!("Failed to create store '{name}'"))?;
+ let store = store_factory(
+ &name,
+ &spec,
+ &store_manager,
+ Some(&mut health_register_store),
+ )
+ .await
+ .err_tip(|| format!("Failed to create store '{name}'"))?;
store_manager.add_store(&name, store);
}
}
@@ -681,16 +686,20 @@ async fn inner_main(
let worker_name = name.clone();
let fut = trace_span!("worker_ctx", worker_name = %name)
.in_scope(|| local_worker.run(shutdown_tx.clone(), shutdown_rx));
- spawn!("worker", async move {
- let result = fut.await;
- if result.is_ok() {
- // Worker completed successfully (graceful shutdown).
- // Exit the process with code 0.
- info!(worker_name = %worker_name, "Worker completed successfully, exiting process");
- std::process::exit(0);
- }
- result
- }, ?name)
+ spawn!(
+ "worker",
+ async move {
+ let result = fut.await;
+ if result.is_ok() {
+ // Worker completed successfully (graceful shutdown).
+ // Exit the process with code 0.
+ info!(worker_name = %worker_name, "Worker completed successfully, exiting process");
+ std::process::exit(0);
+ }
+ result
+ },
+ ?name
+ )
}
};
root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));