Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
408 changes: 408 additions & 0 deletions docs/property-router-scheduler-plan.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use serde::{Deserialize, Serialize};
use crate::schedulers::SchedulerSpec;
use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_vec_string_with_shellexpand, convert_enum_with_shellexpand,
convert_enum_with_shellexpand, convert_numeric_with_shellexpand,
convert_optional_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
convert_string_with_shellexpand, convert_vec_string_with_shellexpand,
};
use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName, StoreSpec};

Expand Down Expand Up @@ -854,7 +854,7 @@ pub struct LocalWorkerConfig {
/// Default: None (directory cache disabled)
pub directory_cache: Option<DirectoryCacheConfig>,

#[serde(deserialize_with = "convert_enum_with_shellexpand")]
#[serde(default, deserialize_with = "convert_enum_with_shellexpand")]
pub execution_completion_behaviour: ExecutionCompletionBehaviour,
}

Expand Down
18 changes: 18 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum SchedulerSpec {
Grpc(GrpcSpec),
CacheLookup(CacheLookupSpec),
PropertyModifier(PropertyModifierSpec),
PropertyRouter(PropertyRouterSpec),
}

/// When the scheduler matches tasks to workers that are capable of running
Expand Down Expand Up @@ -323,3 +324,20 @@ pub struct PropertyModifierSpec {
/// The nested scheduler to use after modifying the properties.
pub scheduler: Box<SchedulerSpec>,
}

/// Routes actions to different schedulers based on a platform property value.
/// Actions whose property value matches a key in `routes` go to that scheduler.
/// All other actions (missing property or unmatched value) go to `default_scheduler`.
#[derive(Deserialize, Serialize, Debug)]
#[serde(deny_unknown_fields)]
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
pub struct PropertyRouterSpec {
/// The platform property key to match on (e.g. "container-image").
pub property_name: String,

/// Map of property value -> nested scheduler spec.
pub routes: HashMap<String, SchedulerSpec>,

/// Scheduler to use when the property is absent or its value does not match any route.
pub default_scheduler: Box<SchedulerSpec>,
}
6 changes: 2 additions & 4 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,8 @@ where
T: DeserializeOwned,
{
let s = String::deserialize(deserializer)?;
let expanded = shellexpand::env(&s)
.map_err(de::Error::custom)?;
let expanded = shellexpand::env(&s).map_err(de::Error::custom)?;

let quoted = format!("\"{}\"", expanded);
serde_json5::from_str(&quoted)
.map_err(de::Error::custom)
serde_json5::from_str(&quoted).map_err(de::Error::custom)
}
2 changes: 2 additions & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rust_library(
"src/mock_scheduler.rs",
"src/platform_property_manager.rs",
"src/property_modifier_scheduler.rs",
"src/property_router_scheduler.rs",
"src/simple_scheduler.rs",
"src/simple_scheduler_state_manager.rs",
"src/store_awaited_action_db.rs",
Expand Down Expand Up @@ -66,6 +67,7 @@ rust_test_suite(
"tests/action_messages_test.rs",
"tests/cache_lookup_scheduler_test.rs",
"tests/property_modifier_scheduler_test.rs",
"tests/property_router_scheduler_test.rs",
"tests/redis_store_awaited_action_db_test.rs",
"tests/simple_scheduler_state_manager_test.rs",
"tests/simple_scheduler_test.rs",
Expand Down
78 changes: 52 additions & 26 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use std::time::{Instant, UNIX_EPOCH};
use async_lock::RwLock;
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
use nativelink_metric::{
group, MetricFieldData, MetricKind, MetricPublishKnownKindData,
MetricsComponent, RootMetricsComponent,
MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
RootMetricsComponent, group,
};
use nativelink_util::action_messages::{OperationId, WorkerId};
use nativelink_util::metrics::{WORKER_POOL_METRICS, WORKER_POOL_INSTANCE, WorkerPoolMetricAttrs};
use nativelink_util::metrics::{WORKER_POOL_INSTANCE, WORKER_POOL_METRICS, WorkerPoolMetricAttrs};
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
use nativelink_util::platform_properties::PlatformProperties;
use nativelink_util::shutdown_guard::ShutdownGuard;
Expand Down Expand Up @@ -63,7 +63,10 @@ pub struct SchedulerMetrics {
}

use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{reduce_platform_properties, Worker, ActionInfoWithProps, WorkerState, WorkerTimestamp, WorkerUpdate};
use crate::worker::{
ActionInfoWithProps, Worker, WorkerState, WorkerTimestamp, WorkerUpdate,
reduce_platform_properties,
};
use crate::worker_capability_index::WorkerCapabilityIndex;
use crate::worker_registry::SharedWorkerRegistry;
use crate::worker_scheduler::WorkerScheduler;
Expand Down Expand Up @@ -96,11 +99,15 @@ impl WorkerSchedulerMetrics {
}

pub fn record_worker_removed(&self) {
WORKER_POOL_METRICS.worker_events.add(1, self.attrs.removed());
WORKER_POOL_METRICS
.worker_events
.add(1, self.attrs.removed());
}

pub fn record_worker_timeout(&self) {
WORKER_POOL_METRICS.worker_events.add(1, self.attrs.timeout());
WORKER_POOL_METRICS
.worker_events
.add(1, self.attrs.timeout());
}

pub fn record_worker_connection_failed(&self) {
Expand Down Expand Up @@ -385,7 +392,7 @@ impl ApiWorkerSchedulerImpl {

/// Batch finds workers for multiple actions in a single pass.
/// This reduces lock contention by acquiring the lock once for all actions.
/// Returns a map of (action_index, worker_id) pairs for successful matches.
/// Returns a map of (`action_index`, `worker_id`) pairs for successful matches.
fn inner_batch_find_workers_for_actions(
&self,
actions: &[&PlatformProperties],
Expand All @@ -409,16 +416,23 @@ impl ApiWorkerSchedulerImpl {
}

if !workers_platform_properties.contains_key(&worker_id) {
workers_platform_properties.insert(worker_id.clone(), worker.platform_properties.clone());
workers_platform_properties
.insert(worker_id.clone(), worker.platform_properties.clone());
}

if !platform_properties.is_satisfied_by(&workers_platform_properties[&worker_id], full_worker_logging) {
if !platform_properties.is_satisfied_by(
&workers_platform_properties[&worker_id],
full_worker_logging,
) {
continue;
}

reduce_platform_properties(workers_platform_properties.get_mut(&worker_id).unwrap(), platform_properties);
reduce_platform_properties(
workers_platform_properties.get_mut(&worker_id).unwrap(),
platform_properties,
);

results.insert(idx, worker_id.clone());
results.insert(idx, worker_id.clone());
break;
}
}
Expand Down Expand Up @@ -660,7 +674,10 @@ impl ApiWorkerSchedulerImpl {
}

fn count_running_actions(&self) -> usize {
self.workers.iter().map(|(_, w)| w.running_action_infos.len()).sum()
self.workers
.iter()
.map(|(_, w)| w.running_action_infos.len())
.sum()
}
}

Expand Down Expand Up @@ -720,7 +737,7 @@ impl ApiWorkerScheduler {

/// Returns a reference to the worker scheduler metrics for recording OTEL metrics.
#[must_use]
pub fn workerMetrics(&self) -> &WorkerSchedulerMetrics {
pub const fn workerMetrics(&self) -> &WorkerSchedulerMetrics {
&self.worker_scheduler_metrics
}

Expand All @@ -744,7 +761,8 @@ impl ApiWorkerScheduler {
} else {
self.worker_scheduler_metrics.record_dispatch_failure();
}
self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
self.worker_scheduler_metrics
.record_running_actions_count(inner.count_running_actions());

result
}
Expand All @@ -764,7 +782,9 @@ impl ApiWorkerScheduler {
.fetch_add(count as u64, Ordering::Relaxed);

let mut inner = self.inner.write().await;
let results = inner.inner_batch_worker_notify_run_action(assignments).await;
let results = inner
.inner_batch_worker_notify_run_action(assignments)
.await;

// Record metrics
let successes = results.iter().filter(|r| r.is_ok()).count();
Expand Down Expand Up @@ -832,7 +852,7 @@ impl ApiWorkerScheduler {
/// This reduces lock contention compared to calling `find_worker_for_action`
/// for each action individually.
///
/// Returns a vector of (action_index, worker_id) pairs for successful matches.
/// Returns a vector of (`action_index`, `worker_id`) pairs for successful matches.
/// Actions that couldn't be matched to a worker are not included in the result.
pub async fn batch_find_workers_for_actions(
&self,
Expand All @@ -846,8 +866,7 @@ impl ApiWorkerScheduler {

let inner = self.inner.read().await;
let worker_count = inner.workers.len() as u64;
let results =
inner.inner_batch_find_workers_for_actions(actions, full_worker_logging);
let results = inner.inner_batch_find_workers_for_actions(actions, full_worker_logging);

// Track metrics
self.metrics
Expand Down Expand Up @@ -917,7 +936,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
if let Err(err) = &result {
self.worker_scheduler_metrics.record_worker_connection_failed();
self.worker_scheduler_metrics
.record_worker_connection_failed();
return Result::<(), _>::Err(err.clone()).merge(
inner
.immediate_evict_worker(&worker_id, err.clone(), false)
Expand All @@ -930,7 +950,8 @@ impl WorkerScheduler for ApiWorkerScheduler {

self.metrics.workers_added.fetch_add(1, Ordering::Relaxed);
self.worker_scheduler_metrics.record_worker_added();
self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
self.worker_scheduler_metrics
.record_worker_count(inner.workers.len());
Ok(())
}

Expand All @@ -955,7 +976,8 @@ impl WorkerScheduler for ApiWorkerScheduler {
if result.is_ok() && is_completion {
self.worker_scheduler_metrics.record_action_completed();
}
self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
self.worker_scheduler_metrics
.record_running_actions_count(inner.count_running_actions());

result
}
Expand Down Expand Up @@ -992,7 +1014,8 @@ impl WorkerScheduler for ApiWorkerScheduler {

// Record worker removal
self.worker_scheduler_metrics.record_worker_removed();
self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
self.worker_scheduler_metrics
.record_worker_count(inner.workers.len());
result
}

Expand Down Expand Up @@ -1086,16 +1109,19 @@ impl WorkerScheduler for ApiWorkerScheduler {
self.worker_scheduler_metrics.record_worker_timeout();
}

self.worker_scheduler_metrics.record_running_actions_count(inner.count_running_actions());
self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
self.worker_scheduler_metrics
.record_running_actions_count(inner.count_running_actions());
self.worker_scheduler_metrics
.record_worker_count(inner.workers.len());

result
}

async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
let mut inner = self.inner.write().await;
inner.set_drain_worker(worker_id, is_draining).await?;
self.worker_scheduler_metrics.record_worker_count(inner.workers.len());
self.worker_scheduler_metrics
.record_worker_count(inner.workers.len());
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl ClientStateManager for CacheLookupScheduler {
self.action_scheduler.as_known_platform_property_provider()
}

fn as_any(&self) -> &dyn std::any::Any {
fn as_any(&self) -> &dyn core::any::Any {
self
}
}
Expand Down
33 changes: 33 additions & 0 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::cache_lookup_scheduler::CacheLookupScheduler;
use crate::grpc_scheduler::GrpcScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::property_router_scheduler::PropertyRouterScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::store_awaited_action_db::StoreAwaitedActionDb;
use crate::worker_scheduler::WorkerScheduler;
Expand Down Expand Up @@ -95,6 +96,38 @@ async fn inner_scheduler_factory(
));
(Some(property_modifier_scheduler), worker_scheduler)
}
SchedulerSpec::PropertyRouter(spec) => {
use std::collections::HashMap;
let mut routes = HashMap::with_capacity(spec.routes.len());
for (value, nested_spec) in &spec.routes {
let (action_scheduler, _) = Box::pin(inner_scheduler_factory(
nested_spec,
store_manager,
maybe_origin_event_tx,
))
.await
.err_tip(|| format!("In nested PropertyRouterScheduler route '{value}'"))?;
routes.insert(
value.clone(),
action_scheduler
.err_tip(|| format!("Nested route '{value}' is not an action scheduler"))?,
);
}
let (default_action_scheduler, _) = Box::pin(inner_scheduler_factory(
&spec.default_scheduler,
store_manager,
maybe_origin_event_tx,
))
.await
.err_tip(|| "In PropertyRouterScheduler default_scheduler")?;
let router = Arc::new(PropertyRouterScheduler::new(
&spec.property_name,
routes,
default_action_scheduler
.err_tip(|| "Default scheduler is not an action scheduler")?,
));
(Some(router), None)
}
};

Ok(scheduler)
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl ClientStateManager for GrpcScheduler {
Some(self)
}

fn as_any(&self) -> &dyn std::any::Any {
fn as_any(&self) -> &dyn core::any::Any {
self
}
}
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod memory_awaited_action_db;
pub mod mock_scheduler;
pub mod platform_property_manager;
pub mod property_modifier_scheduler;
pub mod property_router_scheduler;
pub mod simple_scheduler;
pub mod simple_scheduler_state_manager;
pub mod store_awaited_action_db;
Expand Down
28 changes: 12 additions & 16 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,24 +680,20 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
// Record completion metrics with action digest for failure tracking
let action_digest = old_awaited_action.action_info().digest().to_string();
if let ActionStage::Completed(action_result) = new_stage {
let result_attrs = vec![
opentelemetry::KeyValue::new(
nativelink_util::metrics::EXECUTION_RESULT,
if action_result.exit_code == 0 {
ExecutionResult::Success
} else {
ExecutionResult::Failure
},
),
];
let result_attrs = vec![opentelemetry::KeyValue::new(
nativelink_util::metrics::EXECUTION_RESULT,
if action_result.exit_code == 0 {
ExecutionResult::Success
} else {
ExecutionResult::Failure
},
)];
metrics.execution_completed_count.add(1, &result_attrs);
} else if let ActionStage::CompletedFromCache(_) = new_stage {
let result_attrs = vec![
opentelemetry::KeyValue::new(
nativelink_util::metrics::EXECUTION_RESULT,
ExecutionResult::CacheHit,
),
];
let result_attrs = vec![opentelemetry::KeyValue::new(
nativelink_util::metrics::EXECUTION_RESULT,
ExecutionResult::CacheHit,
)];
metrics.execution_completed_count.add(1, &result_attrs);
}

Expand Down
Loading
Loading