Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ pub struct IterationSchedule {
}

impl IterationSchedule {
/// Partitions the executable canisters to the available cores for execution.
/// Splits the scheduled canisters off into per-core vectors, leaving any
/// non-scheduled canisters (the "inactive" set) in `canisters`. The
/// scheduled canisters are to be reinserted by the caller after execution.
#[allow(clippy::type_complexity)]
pub fn partition_canisters_to_cores(
&self,
Expand Down Expand Up @@ -277,6 +279,41 @@ impl RoundSchedule {
// Sum of all long execution canisters' compute allocations.
let mut long_executions_compute_allocation = ZERO;

if is_first_iteration {
// (Only) drop from the schedule idle canisters with 0-100 AP.
//
// Idle canisters with negative AP are kept in the schedule until they reach 0
// AP, to prevent them from jumping from the back of the schedule to the middle
// just by being idle for a round. Similarly, idle canisters with positive AP
// burn it down as if they had executed full rounds until they (almost) reach 0.
let idle_canisters_to_drop = subnet_schedule
.iter()
.filter_map(|(canister_id, canister_priority)| {
if canister_priority.accumulated_priority < ZERO
|| canister_priority.accumulated_priority > ONE_HUNDRED_PERCENT
{
// Not in the 0-100 AP range: keep.
return None;
}
let Some(canister) = canister_states.get(canister_id) else {
// Canister was deleted: drop.
return Some(*canister_id);
};
if !canister.must_be_in_schedule()
&& canister.next_execution() == NextExecution::None
{
// Canister is idle: drop.
Some(*canister_id)
} else {
None
}
})
.collect::<Vec<_>>();
for canister_id in idle_canisters_to_drop {
subnet_schedule.remove(&canister_id);
}
Comment thread
eichhorl marked this conversation as resolved.
Outdated
}

// Collect all active canisters and their next executions.
//
// Unfortunately, not all active canisters are in the subnet schedule, so we
Expand Down Expand Up @@ -307,35 +344,22 @@ impl RoundSchedule {
if self.long_execution_canisters.contains(canister_id) {
return None;
}
CanisterRoundState::new(canister, subnet_schedule.get_mut(*canister_id))
CanisterRoundState::new(canister, subnet_schedule.get(canister_id))
Comment thread
alin-at-dfinity marked this conversation as resolved.
}

NextExecution::ContinueLong => {
if is_first_iteration {
self.long_execution_canisters.insert(*canister_id);
}
let priority = subnet_schedule.get_mut(*canister_id);
let rs = CanisterRoundState::new(canister, priority);
let rs =
CanisterRoundState::new(canister, subnet_schedule.get(canister_id));
long_executions_count += 1;
long_executions_compute_allocation += rs.compute_allocation;
rs
}

NextExecution::None => {
// (Only) drop from the schedule idle canisters with 0-100 AP.
//
// Idle canisters with negative AP are kept in the schedule until they reach 0
// AP, to prevent them from jumping from the back of the schedule to the middle
// just by being idle for a round. Similarly, idle canisters with positive AP
// burn it down as if they had executed full rounds until they (almost) reach 0.
if is_first_iteration && !canister.must_be_in_schedule() {
let canister_priority = subnet_schedule.get(canister_id);
if canister_priority.accumulated_priority >= ZERO
&& canister_priority.accumulated_priority <= ONE_HUNDRED_PERCENT
{
subnet_schedule.remove(canister_id);
}
}
// Already dropped idle canisters with 0-100 AP above. Nothing to do here.
return None;
}

Expand Down
Loading