Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion applications/tests/test_sched_preempt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use awkernel_lib::{
#[allow(dead_code)]
enum TestType {
GetlowestTask,
SchedPreempt,
}

/// Tests related to preemption between schedulers
Expand All @@ -21,9 +22,10 @@ pub async fn run() {
cpu_id()
);

let test_type = TestType::GetlowestTask;
let test_type = TestType::SchedPreempt;
match test_type {
TestType::GetlowestTask => check_lowest_task().await,
TestType::SchedPreempt => check_sched_preempt().await,
}

log::info!(
Expand Down Expand Up @@ -69,3 +71,50 @@ async fn check_lowest_task() {
log::debug!("Task ID: {}, cpu_id: {}", task_id, cpu_id,);
}
}

async fn check_sched_preempt() {
log::info!("[{}] GEDF_H1 spawn", uptime());
spawn(
"GEDF_H1".into(),
async move {
log::info!("[{}] GEDF_H1 is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] GEDF_H1 is end at cpu_id: {}", uptime(), cpu_id());
},
SchedulerType::GEDF(99000000),
)
.await;
log::info!("[{}] FIFO_L1 spawn", uptime());
spawn(
"FIFO_M1".into(),
async move {
log::info!("[{}] FIFO_M1 is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] FIFO_M1 is end at cpu_id: {}", uptime(), cpu_id());
},
SchedulerType::FIFO,
)
.await;
log::info!("[{}] RR_L1 spawn", uptime());
spawn(
"RR_L1".into(),
async move {
log::info!("[{}] RR_L1 is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] RR_L1 is end at cpu_id: {}", uptime(), cpu_id());
},
SchedulerType::RR,
)
.await;
log::info!("[{}] GEDF_H2 spawn", uptime());
spawn(
"GEDF_H2".into(),
async move {
log::info!("[{}] GEDF_H2 is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] GEDF_H2 is end at cpu_id: {}", uptime(), cpu_id());
},
SchedulerType::GEDF(98000000),
)
.await;
}
2 changes: 1 addition & 1 deletion awkernel_async_lib/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub(crate) trait Scheduler {
/// Get the scheduler name.
fn scheduler_name(&self) -> SchedulerType;

#[allow(dead_code)] // TODO: to be removed
/// Get the priority of the scheduler.
fn priority(&self) -> u8;
}

Expand Down
41 changes: 2 additions & 39 deletions awkernel_async_lib/src/scheduler/gedf.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
//! A GEDF scheduler.

use super::{Scheduler, SchedulerType, Task};
use crate::{
scheduler::get_priority,
task::{get_absolute_deadline_by_task_id, get_tasks_running, set_need_preemption, State},
};
use crate::{scheduler::get_priority, task::State};
use alloc::{collections::BinaryHeap, sync::Arc};
use awkernel_lib::{
cpu::num_cpu,
sync::mutex::{MCSNode, Mutex},
};
use awkernel_lib::sync::mutex::{MCSNode, Mutex};

pub struct GEDFScheduler {
data: Mutex<Option<GEDFData>>, // Run queue.
Expand Down Expand Up @@ -82,8 +76,6 @@ impl Scheduler for GEDFScheduler {
absolute_deadline,
wake_time,
});

self.invoke_preemption(absolute_deadline);
}

fn get_next(&self) -> Option<Arc<Task>> {
Expand Down Expand Up @@ -129,32 +121,3 @@ pub static SCHEDULER: GEDFScheduler = GEDFScheduler {
data: Mutex::new(None),
priority: get_priority(SchedulerType::GEDF(0)),
};

impl GEDFScheduler {
fn invoke_preemption(&self, absolute_deadline: u64) {
// Get running tasks and filter out tasks with task_id == 0.
let mut tasks = get_tasks_running();
tasks.retain(|task| task.task_id != 0);

// If the number of running tasks is less than the number of non-primary CPUs, preempt is not required.
let num_non_primary_cpus = num_cpu() - 1;
if tasks.len() < num_non_primary_cpus {
return;
}

let task_with_max_deadline = tasks
.iter()
.filter_map(|task| {
get_absolute_deadline_by_task_id(task.task_id).map(|deadline| (task, deadline))
})
.max_by_key(|&(_, deadline)| deadline);

if let Some((task, max_absolute_deadline)) = task_with_max_deadline {
if max_absolute_deadline > absolute_deadline {
let preempt_irq = awkernel_lib::interrupt::get_preempt_irq();
set_need_preemption(task.task_id);
awkernel_lib::interrupt::send_ipi(preempt_irq, task.cpu_id as u32);
}
}
}
}
125 changes: 102 additions & 23 deletions awkernel_async_lib/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use awkernel_lib::{
unwind::catch_unwind,
};
use core::{
sync::atomic::{AtomicU32, AtomicU64, Ordering},
sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
task::{Context, Poll},
};
use futures::{
Expand All @@ -48,6 +48,9 @@ pub type TaskResult = Result<(), Cow<'static, str>>;

static TASKS: Mutex<Tasks> = Mutex::new(Tasks::new()); // Set of tasks.
static RUNNING: [AtomicU32; NUM_MAX_CPU] = array![_ => AtomicU32::new(0); NUM_MAX_CPU]; // IDs of running tasks.
pub static IN_TRANSITION: [AtomicBool; NUM_MAX_CPU] =
array![_ => AtomicBool::new(true); NUM_MAX_CPU]; // Whether or not RUNNING can be loaded.
static IS_SEND_IPI: AtomicBool = AtomicBool::new(false); // Whether or not send IPI to other CPUs.
static MAX_TASK_PRIORITY: u64 = (1 << 56) - 1; // Maximum task priority.

/// Task has ID, future, information, and a reference to a scheduler.
Expand Down Expand Up @@ -304,14 +307,32 @@ pub fn spawn(

let scheduler = get_scheduler(sched_type);

#[cfg(all(
any(target_arch = "aarch64", target_arch = "x86_64"),
not(feature = "std")
))]
{
awkernel_lib::interrupt::disable();
}

let mut node = MCSNode::new();
let mut tasks = TASKS.lock(&mut node);
let id = tasks.spawn(name, future.fuse(), scheduler, sched_type);
let task = tasks.id_to_task.get(&id).cloned();
drop(tasks);

if let Some(task) = task {
let priority = task.priority.clone();
task.wake();
invoke_preemption(priority);
}

#[cfg(all(
any(target_arch = "aarch64", target_arch = "x86_64"),
not(feature = "std")
))]
{
awkernel_lib::interrupt::enable();
}

id
Expand Down Expand Up @@ -676,6 +697,7 @@ pub fn run_main() {
perf::add_kernel_time_start(awkernel_lib::cpu::cpu_id(), cpu_counter());

if let Some(task) = get_next_task() {
IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed);
#[cfg(not(feature = "no_preempt"))]
{
// If the next task is a preempted task, then the current task will yield to the thread holding the next task.
Expand Down Expand Up @@ -750,6 +772,7 @@ pub fn run_main() {
};

RUNNING[cpu_id].store(task.id, Ordering::Relaxed);
IN_TRANSITION[cpu_id].store(false, Ordering::Relaxed);

// Invoke a task.
catch_unwind(|| {
Expand Down Expand Up @@ -797,6 +820,7 @@ pub fn run_main() {
awkernel_lib::heap::TALLOC.use_primary_then_backup_cpu_id(cpu_id)
};

IN_TRANSITION[cpu_id].store(true, Ordering::Relaxed);
let running_id = RUNNING[cpu_id].swap(0, Ordering::Relaxed);
assert_eq!(running_id, task.id);

Expand Down Expand Up @@ -855,6 +879,7 @@ pub fn run_main() {
}
}
} else {
IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(false, Ordering::Relaxed);
#[cfg(feature = "perf")]
perf::add_idle_time_start(awkernel_lib::cpu::cpu_id(), cpu_counter());

Expand Down Expand Up @@ -1079,35 +1104,89 @@ impl Ord for PriorityInfo {
}

pub fn get_lowest_priority_task_info() -> Option<(u32, usize, PriorityInfo)> {
let non_primary_cpus = awkernel_lib::cpu::num_cpu().saturating_sub(1);
let mut lowest_task: Option<(u32, usize, PriorityInfo)> = None; // (task_id, cpu_id, priority_info)

let running_tasks: Vec<RunningTask> = get_tasks_running()
.into_iter()
.filter(|task| task.task_id != 0)
.collect();
loop {
// Wait until all task statuses are ready to load.
loop {
let false_count = IN_TRANSITION
.iter()
.filter(|flag| !flag.load(Ordering::Relaxed))
.count();

let priority_infos: Vec<(u32, usize, PriorityInfo)> = {
let mut node = MCSNode::new();
let tasks = TASKS.lock(&mut node);
running_tasks
if false_count == non_primary_cpus {
break;
}
}

let running_tasks: Vec<RunningTask> = get_tasks_running()
.into_iter()
.filter_map(|task| {
tasks
.id_to_task
.get(&task.task_id)
.map(|task_data| (task.task_id, task.cpu_id, task_data.priority.clone()))
})
.collect()
};
.filter(|task| task.task_id != 0)
.collect();

for (task_id, cpu_id, priority_info) in priority_infos {
if lowest_task
.as_ref()
.is_none_or(|(_, _, lowest_priority_info)| priority_info > *lowest_priority_info)
{
lowest_task = Some((task_id, cpu_id, priority_info));
// 'is_empty()' is required to pass 'make test'
if running_tasks.is_empty() || running_tasks.len() < non_primary_cpus {
return None;
}

let priority_infos: Vec<(u32, usize, PriorityInfo)> = {
let mut node = MCSNode::new();
let tasks = TASKS.lock(&mut node);
running_tasks
.into_iter()
.filter_map(|task| {
tasks
.id_to_task
.get(&task.task_id)
.map(|task_data| (task.task_id, task.cpu_id, task_data.priority.clone()))
})
.collect()
};

for (task_id, cpu_id, priority_info) in priority_infos {
if lowest_task
.as_ref()
.is_none_or(|(_, _, lowest_priority_info)| priority_info > *lowest_priority_info)
{
lowest_task = Some((task_id, cpu_id, priority_info));
}
}

// Check to confirm that the information has not changed while getting priority_info.
if let Some((task_id, cpu_id, _)) = lowest_task {
if !IS_SEND_IPI.load(Ordering::Relaxed)
&& !IN_TRANSITION[cpu_id].load(Ordering::Relaxed)
&& RUNNING[cpu_id].load(Ordering::Relaxed) == task_id
{
break;
}
} else {
lowest_task = None;
}
}

lowest_task
}

/// Invoke preemption.
fn invoke_preemption(wake_task_priority: PriorityInfo) {
while let Some((task_id, cpu_id, lowest_priority_info)) = get_lowest_priority_task_info() {
if wake_task_priority < lowest_priority_info {
if IS_SEND_IPI
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
continue;
}
if IN_TRANSITION[cpu_id].load(Ordering::Relaxed) {
continue;
}
let preempt_irq = awkernel_lib::interrupt::get_preempt_irq();
set_need_preemption(task_id);
awkernel_lib::interrupt::send_ipi(preempt_irq, cpu_id as u32);
IS_SEND_IPI.store(false, Ordering::Relaxed);
}
break;
}
}
3 changes: 2 additions & 1 deletion awkernel_async_lib/src/task/preempt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::task::{get_current_task, Task};
use crate::task::{get_current_task, Task, IN_TRANSITION};
use alloc::{collections::VecDeque, sync::Arc};
use array_macro::array;
use awkernel_lib::{
Expand Down Expand Up @@ -176,6 +176,7 @@ impl Drop for RunningTaskGuard {
}

super::RUNNING[cpu_id].store(self.0, Ordering::Relaxed);
IN_TRANSITION[cpu_id].store(false, Ordering::Relaxed);
}
}

Expand Down
3 changes: 3 additions & 0 deletions kernel/src/arch/aarch64/exception.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use awkernel_async_lib::task::IN_TRANSITION;
#[cfg(feature = "perf")]
use awkernel_async_lib::{
cpu_counter,
Expand All @@ -11,6 +12,7 @@ use awkernel_lib::{
interrupt,
};
use core::str::from_utf8_unchecked;
use core::sync::atomic::Ordering;

const _ESR_EL1_EC_MASK: u64 = 0b111111 << 26;
const _ESR_EL1_EC_UNKNOWN: u64 = 0b000000 << 26;
Expand Down Expand Up @@ -224,6 +226,7 @@ ESR = 0x{:x}

#[no_mangle]
pub extern "C" fn curr_el_spx_irq_el1(_ctx: *mut Context, _sp: usize, _esr: usize) {
IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed);
#[cfg(feature = "perf")]
{
add_task_end(awkernel_lib::cpu::cpu_id(), cpu_counter());
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/arch/x86_64/interrupt_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use awkernel_async_lib::{
},
};

use awkernel_async_lib::task::IN_TRANSITION;
use awkernel_lib::delay::wait_forever;
use core::sync::atomic::Ordering;
use x86_64::structures::idt::{InterruptDescriptorTable, InterruptStackFrame, PageFaultErrorCode};

static mut IDT: InterruptDescriptorTable = InterruptDescriptorTable::new();
Expand Down Expand Up @@ -518,6 +520,7 @@ irq_handler!(irq253, 253);
irq_handler!(irq254, 254);

extern "x86-interrupt" fn preemption(_stack_frame: InterruptStackFrame) {
IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed);
#[cfg(feature = "perf")]
{
add_task_end(awkernel_lib::cpu::cpu_id(), cpu_counter());
Expand Down
Loading
Loading