diff --git a/applications/tests/test_sched_preempt/src/lib.rs b/applications/tests/test_sched_preempt/src/lib.rs index 5b0c89cb5..5ef5f010e 100644 --- a/applications/tests/test_sched_preempt/src/lib.rs +++ b/applications/tests/test_sched_preempt/src/lib.rs @@ -9,6 +9,7 @@ use awkernel_lib::{ #[allow(dead_code)] enum TestType { GetlowestTask, + SchedPreempt, } /// Tests related to preemption between schedulers @@ -21,9 +22,10 @@ pub async fn run() { cpu_id() ); - let test_type = TestType::GetlowestTask; + let test_type = TestType::SchedPreempt; match test_type { TestType::GetlowestTask => check_lowest_task().await, + TestType::SchedPreempt => check_sched_preempt().await, } log::info!( @@ -69,3 +71,50 @@ async fn check_lowest_task() { log::debug!("Task ID: {}, cpu_id: {}", task_id, cpu_id,); } } + +async fn check_sched_preempt() { + log::info!("[{}] GEDF_H1 spawn", uptime()); + spawn( + "GEDF_H1".into(), + async move { + log::info!("[{}] GEDF_H1 is start at cpu_id: {}", uptime(), cpu_id()); + wait_microsec(10000000); + log::info!("[{}] GEDF_H1 is end at cpu_id: {}", uptime(), cpu_id()); + }, + SchedulerType::GEDF(99000000), + ) + .await; + log::info!("[{}] FIFO_L1 spawn", uptime()); + spawn( + "FIFO_M1".into(), + async move { + log::info!("[{}] FIFO_M1 is start at cpu_id: {}", uptime(), cpu_id()); + wait_microsec(10000000); + log::info!("[{}] FIFO_M1 is end at cpu_id: {}", uptime(), cpu_id()); + }, + SchedulerType::FIFO, + ) + .await; + log::info!("[{}] RR_L1 spawn", uptime()); + spawn( + "RR_L1".into(), + async move { + log::info!("[{}] RR_L1 is start at cpu_id: {}", uptime(), cpu_id()); + wait_microsec(10000000); + log::info!("[{}] RR_L1 is end at cpu_id: {}", uptime(), cpu_id()); + }, + SchedulerType::RR, + ) + .await; + log::info!("[{}] GEDF_H2 spawn", uptime()); + spawn( + "GEDF_H2".into(), + async move { + log::info!("[{}] GEDF_H2 is start at cpu_id: {}", uptime(), cpu_id()); + wait_microsec(10000000); + log::info!("[{}] GEDF_H2 is end at cpu_id: {}", uptime(), cpu_id()); + }, + SchedulerType::GEDF(98000000), + ) + .await; +} diff --git a/awkernel_async_lib/src/scheduler.rs b/awkernel_async_lib/src/scheduler.rs index 4e384a6db..ea2472927 100644 --- a/awkernel_async_lib/src/scheduler.rs +++ b/awkernel_async_lib/src/scheduler.rs @@ -89,7 +89,7 @@ pub(crate) trait Scheduler { /// Get the scheduler name. fn scheduler_name(&self) -> SchedulerType; - #[allow(dead_code)] // TODO: to be removed + /// Get the priority of the scheduler. fn priority(&self) -> u8; } diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index 2881f86a5..a7be9a0a3 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -1,15 +1,9 @@ //! A GEDF scheduler. use super::{Scheduler, SchedulerType, Task}; -use crate::{ - scheduler::get_priority, - task::{get_absolute_deadline_by_task_id, get_tasks_running, set_need_preemption, State}, -}; +use crate::{scheduler::get_priority, task::State}; use alloc::{collections::BinaryHeap, sync::Arc}; -use awkernel_lib::{ - cpu::num_cpu, - sync::mutex::{MCSNode, Mutex}, -}; +use awkernel_lib::sync::mutex::{MCSNode, Mutex}; pub struct GEDFScheduler { data: Mutex>, // Run queue. @@ -82,8 +76,6 @@ impl Scheduler for GEDFScheduler { absolute_deadline, wake_time, }); - - self.invoke_preemption(absolute_deadline); } fn get_next(&self) -> Option> { @@ -129,32 +121,3 @@ pub static SCHEDULER: GEDFScheduler = GEDFScheduler { data: Mutex::new(None), priority: get_priority(SchedulerType::GEDF(0)), }; - -impl GEDFScheduler { - fn invoke_preemption(&self, absolute_deadline: u64) { - // Get running tasks and filter out tasks with task_id == 0. - let mut tasks = get_tasks_running(); - tasks.retain(|task| task.task_id != 0); - - // If the number of running tasks is less than the number of non-primary CPUs, preempt is not required. - let num_non_primary_cpus = num_cpu() - 1; - if tasks.len() < num_non_primary_cpus { - return; - } - - let task_with_max_deadline = tasks - .iter() - .filter_map(|task| { - get_absolute_deadline_by_task_id(task.task_id).map(|deadline| (task, deadline)) - }) - .max_by_key(|&(_, deadline)| deadline); - - if let Some((task, max_absolute_deadline)) = task_with_max_deadline { - if max_absolute_deadline > absolute_deadline { - let preempt_irq = awkernel_lib::interrupt::get_preempt_irq(); - set_need_preemption(task.task_id); - awkernel_lib::interrupt::send_ipi(preempt_irq, task.cpu_id as u32); - } - } - } -} diff --git a/awkernel_async_lib/src/task.rs b/awkernel_async_lib/src/task.rs index 7696e24be..9a3225a2e 100644 --- a/awkernel_async_lib/src/task.rs +++ b/awkernel_async_lib/src/task.rs @@ -25,7 +25,7 @@ use awkernel_lib::{ unwind::catch_unwind, }; use core::{ - sync::atomic::{AtomicU32, AtomicU64, Ordering}, + sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, task::{Context, Poll}, }; use futures::{ @@ -48,6 +48,9 @@ pub type TaskResult = Result<(), Cow<'static, str>>; static TASKS: Mutex = Mutex::new(Tasks::new()); // Set of tasks. static RUNNING: [AtomicU32; NUM_MAX_CPU] = array![_ => AtomicU32::new(0); NUM_MAX_CPU]; // IDs of running tasks. +pub static IN_TRANSITION: [AtomicBool; NUM_MAX_CPU] = + array![_ => AtomicBool::new(true); NUM_MAX_CPU]; // Whether or not RUNNING can be loaded. +static IS_SEND_IPI: AtomicBool = AtomicBool::new(false); // Whether or not send IPI to other CPUs. static MAX_TASK_PRIORITY: u64 = (1 << 56) - 1; // Maximum task priority. /// Task has ID, future, information, and a reference to a scheduler. @@ -304,6 +307,14 @@ pub fn spawn( let scheduler = get_scheduler(sched_type); + #[cfg(all( + any(target_arch = "aarch64", target_arch = "x86_64"), + not(feature = "std") + ))] + { + awkernel_lib::interrupt::disable(); + } + let mut node = MCSNode::new(); let mut tasks = TASKS.lock(&mut node); let id = tasks.spawn(name, future.fuse(), scheduler, sched_type); @@ -311,7 +322,17 @@ pub fn spawn( drop(tasks); if let Some(task) = task { + let priority = task.priority.clone(); task.wake(); + invoke_preemption(priority); + } + + #[cfg(all( + any(target_arch = "aarch64", target_arch = "x86_64"), + not(feature = "std") + ))] + { + awkernel_lib::interrupt::enable(); } id @@ -676,6 +697,7 @@ pub fn run_main() { perf::add_kernel_time_start(awkernel_lib::cpu::cpu_id(), cpu_counter()); if let Some(task) = get_next_task() { + IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed); #[cfg(not(feature = "no_preempt"))] { // If the next task is a preempted task, then the current task will yield to the thread holding the next task. @@ -750,6 +772,7 @@ pub fn run_main() { }; RUNNING[cpu_id].store(task.id, Ordering::Relaxed); + IN_TRANSITION[cpu_id].store(false, Ordering::Relaxed); // Invoke a task. catch_unwind(|| { @@ -797,6 +820,7 @@ pub fn run_main() { awkernel_lib::heap::TALLOC.use_primary_then_backup_cpu_id(cpu_id) }; + IN_TRANSITION[cpu_id].store(true, Ordering::Relaxed); let running_id = RUNNING[cpu_id].swap(0, Ordering::Relaxed); assert_eq!(running_id, task.id); @@ -855,6 +879,7 @@ pub fn run_main() { } } } else { + IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(false, Ordering::Relaxed); #[cfg(feature = "perf")] perf::add_idle_time_start(awkernel_lib::cpu::cpu_id(), cpu_counter()); @@ -1079,35 +1104,89 @@ impl Ord for PriorityInfo { } pub fn get_lowest_priority_task_info() -> Option<(u32, usize, PriorityInfo)> { + let non_primary_cpus = awkernel_lib::cpu::num_cpu().saturating_sub(1); let mut lowest_task: Option<(u32, usize, PriorityInfo)> = None; // (task_id, cpu_id, priority_info) - let running_tasks: Vec = get_tasks_running() - .into_iter() - .filter(|task| task.task_id != 0) - .collect(); + loop { + // Wait until all task statuses are ready to load. + loop { + let false_count = IN_TRANSITION + .iter() + .filter(|flag| !flag.load(Ordering::Relaxed)) + .count(); - let priority_infos: Vec<(u32, usize, PriorityInfo)> = { - let mut node = MCSNode::new(); - let tasks = TASKS.lock(&mut node); - running_tasks + if false_count == non_primary_cpus { + break; + } + } + + let running_tasks: Vec = get_tasks_running() .into_iter() - .filter_map(|task| { - tasks - .id_to_task - .get(&task.task_id) - .map(|task_data| (task.task_id, task.cpu_id, task_data.priority.clone())) - }) - .collect() - }; + .filter(|task| task.task_id != 0) + .collect(); - for (task_id, cpu_id, priority_info) in priority_infos { - if lowest_task - .as_ref() - .is_none_or(|(_, _, lowest_priority_info)| priority_info > *lowest_priority_info) - { - lowest_task = Some((task_id, cpu_id, priority_info)); + // 'is_empty()' is required to pass 'make test' + if running_tasks.is_empty() || running_tasks.len() < non_primary_cpus { + return None; + } + + let priority_infos: Vec<(u32, usize, PriorityInfo)> = { + let mut node = MCSNode::new(); + let tasks = TASKS.lock(&mut node); + running_tasks + .into_iter() + .filter_map(|task| { + tasks + .id_to_task + .get(&task.task_id) + .map(|task_data| (task.task_id, task.cpu_id, task_data.priority.clone())) + }) + .collect() + }; + + for (task_id, cpu_id, priority_info) in priority_infos { + if lowest_task + .as_ref() + .is_none_or(|(_, _, lowest_priority_info)| priority_info > *lowest_priority_info) + { + lowest_task = Some((task_id, cpu_id, priority_info)); + } + } + + // Check to confirm that the information has not changed while getting priority_info. + if let Some((task_id, cpu_id, _)) = lowest_task { + if !IS_SEND_IPI.load(Ordering::Relaxed) + && !IN_TRANSITION[cpu_id].load(Ordering::Relaxed) + && RUNNING[cpu_id].load(Ordering::Relaxed) == task_id + { + break; + } + } else { + lowest_task = None; } } lowest_task } + +/// Invoke preemption. +fn invoke_preemption(wake_task_priority: PriorityInfo) { + while let Some((task_id, cpu_id, lowest_priority_info)) = get_lowest_priority_task_info() { + if wake_task_priority < lowest_priority_info { + if IS_SEND_IPI + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + continue; + } + if IN_TRANSITION[cpu_id].load(Ordering::Relaxed) { + continue; + } + let preempt_irq = awkernel_lib::interrupt::get_preempt_irq(); + set_need_preemption(task_id); + awkernel_lib::interrupt::send_ipi(preempt_irq, cpu_id as u32); + IS_SEND_IPI.store(false, Ordering::Relaxed); + } + break; + } +} diff --git a/awkernel_async_lib/src/task/preempt.rs b/awkernel_async_lib/src/task/preempt.rs index 04b0deaa8..3623905ec 100644 --- a/awkernel_async_lib/src/task/preempt.rs +++ b/awkernel_async_lib/src/task/preempt.rs @@ -1,4 +1,4 @@ -use crate::task::{get_current_task, Task}; +use crate::task::{get_current_task, Task, IN_TRANSITION}; use alloc::{collections::VecDeque, sync::Arc}; use array_macro::array; use awkernel_lib::{ @@ -176,6 +176,7 @@ impl Drop for RunningTaskGuard { } super::RUNNING[cpu_id].store(self.0, Ordering::Relaxed); + IN_TRANSITION[cpu_id].store(false, Ordering::Relaxed); } } diff --git a/kernel/src/arch/aarch64/exception.rs b/kernel/src/arch/aarch64/exception.rs index f21a23c14..8b1ec1453 100644 --- a/kernel/src/arch/aarch64/exception.rs +++ b/kernel/src/arch/aarch64/exception.rs @@ -1,3 +1,4 @@ +use awkernel_async_lib::task::IN_TRANSITION; #[cfg(feature = "perf")] use awkernel_async_lib::{ cpu_counter, @@ -11,6 +12,7 @@ use awkernel_lib::{ interrupt, }; use core::str::from_utf8_unchecked; +use core::sync::atomic::Ordering; const _ESR_EL1_EC_MASK: u64 = 0b111111 << 26; const _ESR_EL1_EC_UNKNOWN: u64 = 0b000000 << 26; @@ -224,6 +226,7 @@ ESR = 0x{:x} #[no_mangle] pub extern "C" fn curr_el_spx_irq_el1(_ctx: *mut Context, _sp: usize, _esr: usize) { + IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed); #[cfg(feature = "perf")] { add_task_end(awkernel_lib::cpu::cpu_id(), cpu_counter()); diff --git a/kernel/src/arch/x86_64/interrupt_handler.rs b/kernel/src/arch/x86_64/interrupt_handler.rs index be1bd73f8..c4d762c19 100644 --- a/kernel/src/arch/x86_64/interrupt_handler.rs +++ b/kernel/src/arch/x86_64/interrupt_handler.rs @@ -7,7 +7,9 @@ use awkernel_async_lib::{ }, }; +use awkernel_async_lib::task::IN_TRANSITION; use awkernel_lib::delay::wait_forever; +use core::sync::atomic::Ordering; use x86_64::structures::idt::{InterruptDescriptorTable, InterruptStackFrame, PageFaultErrorCode}; static mut IDT: InterruptDescriptorTable = InterruptDescriptorTable::new(); @@ -518,6 +520,7 @@ irq_handler!(irq253, 253); irq_handler!(irq254, 254); extern "x86-interrupt" fn preemption(_stack_frame: InterruptStackFrame) { + IN_TRANSITION[awkernel_lib::cpu::cpu_id()].store(true, Ordering::Relaxed); #[cfg(feature = "perf")] { add_task_end(awkernel_lib::cpu::cpu_id(), cpu_counter()); diff --git a/kernel/src/main.rs b/kernel/src/main.rs index ff80bf2ec..8328d7e4a 100644 --- a/kernel/src/main.rs +++ b/kernel/src/main.rs @@ -14,6 +14,7 @@ extern crate alloc; use awkernel_async_lib::{ scheduler::{wake_task, SchedulerType}, task, + task::IN_TRANSITION, }; use core::{ fmt::Debug, @@ -42,11 +43,13 @@ static NUM_READY_WORKER: AtomicU16 = AtomicU16::new(0); /// `Info` of `KernelInfo` represents architecture specific information. fn main(kernel_info: KernelInfo) { log::info!("CPU#{} is starting.", kernel_info.cpu_id); + IN_TRANSITION[kernel_info.cpu_id].store(false, Ordering::Relaxed); unsafe { awkernel_lib::cpu::increment_num_cpu() }; if kernel_info.cpu_id == 0 { // Primary CPU. + IN_TRANSITION[0].store(true, Ordering::Relaxed); let _ = draw_splash();