From 4fca63bb295830027507d8e27b08a0f5a40abfa4 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Sep 2025 12:49:30 +0900 Subject: [PATCH 1/6] fix(scheduler): solve inter-scheduler priority inversion Signed-off-by: atsushi421 --- .../Cargo.toml | 15 ++++++ .../src/lib.rs | 51 +++++++++++++++++++ awkernel_async_lib/src/scheduler.rs | 5 ++ awkernel_async_lib/src/scheduler/gedf.rs | 21 ++++---- .../src/scheduler/prioritized_fifo.rs | 18 ++++--- .../src/scheduler/prioritized_rr.rs | 21 +++++--- .../task/preemptive_spin/data_structure.pml | 1 + .../task/preemptive_spin/for_verification.pml | 2 +- .../src/task/preemptive_spin/preemptive.pml | 12 +++-- userland/Cargo.toml | 4 ++ userland/src/lib.rs | 3 ++ 11 files changed, 123 insertions(+), 30 deletions(-) create mode 100644 applications/tests/test_multi_prioritized_scheduler/Cargo.toml create mode 100644 applications/tests/test_multi_prioritized_scheduler/src/lib.rs diff --git a/applications/tests/test_multi_prioritized_scheduler/Cargo.toml b/applications/tests/test_multi_prioritized_scheduler/Cargo.toml new file mode 100644 index 000000000..07fbc1f3c --- /dev/null +++ b/applications/tests/test_multi_prioritized_scheduler/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "test_multi_prioritized_scheduler" +version = "0.1.0" +edition = "2021" + +[dependencies] +log = "0.4" + +[dependencies.awkernel_async_lib] +path = "../../../awkernel_async_lib" +default-features = false + +[dependencies.awkernel_lib] +path = "../../../awkernel_lib" +default-features = false diff --git a/applications/tests/test_multi_prioritized_scheduler/src/lib.rs b/applications/tests/test_multi_prioritized_scheduler/src/lib.rs new file mode 100644 index 000000000..3cea83890 --- /dev/null +++ b/applications/tests/test_multi_prioritized_scheduler/src/lib.rs @@ -0,0 +1,51 @@ +#![no_std] + +extern crate alloc; + +use awkernel_async_lib::{scheduler::SchedulerType, spawn}; +use awkernel_lib::{cpu::num_cpu, delay::wait_millisec}; + +pub async fn run() { + wait_millisec(1000); + for i in 1..num_cpu() { + let sched_type = if i % 3 == 0 { + SchedulerType::PrioritizedFIFO(0) + } else if i % 3 == 1 { + SchedulerType::PrioritizedRR(0) + } else { + SchedulerType::GEDF(1000) + }; + + spawn( + "low_priority".into(), + async move { + log::debug!("low priority task {i} started. sched_type = {sched_type:?}"); + wait_millisec(500); + log::debug!("low priority task {i} finished. sched_type = {sched_type:?}"); + }, + sched_type, + ) + .await; + } + + for i in 1..num_cpu() { + let sched_type = if i % 3 == 0 { + SchedulerType::PrioritizedFIFO(1) + } else if i % 3 == 1 { + SchedulerType::PrioritizedRR(1) + } else { + SchedulerType::GEDF(500) + }; + + spawn( + "high_priority".into(), + async move { + log::debug!("high priority task {i} started. sched_type = {sched_type:?}"); + wait_millisec(100); + log::debug!("high priority task {i} finished. sched_type = {sched_type:?}"); + }, + sched_type, + ) + .await; + } +} diff --git a/awkernel_async_lib/src/scheduler.rs b/awkernel_async_lib/src/scheduler.rs index b85f8b50a..61a784678 100644 --- a/awkernel_async_lib/src/scheduler.rs +++ b/awkernel_async_lib/src/scheduler.rs @@ -116,6 +116,11 @@ static PRIORITY_LIST: [SchedulerType; 4] = [ SchedulerType::Panicked, ]; +/// For exclusion execution of `wake_task` and `get_next` across all schedulers. +/// In order to resolve priority inversion in multiple priority-based schedulers, +/// the decision to preempt, dequeuing, enqueuing, and updating of RUNNING must be executed exclusively. +static GLOBAL_WAKE_GET_MUTEX: Mutex<()> = Mutex::new(()); + pub(crate) trait Scheduler { /// Enqueue an executable task. /// The enqueued task will be taken by `get_next()`. diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index 01e5d7d82..f6db80679 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -4,7 +4,9 @@ use core::cmp::max; use super::{Scheduler, SchedulerType, Task}; use crate::{ - scheduler::{get_priority, peek_preemption_pending, push_preemption_pending}, + scheduler::{ + get_priority, peek_preemption_pending, push_preemption_pending, GLOBAL_WAKE_GET_MUTEX, + }, task::{ get_task, get_tasks_running, set_current_task, set_need_preemption, State, MAX_TASK_PRIORITY, @@ -61,13 +63,6 @@ impl GEDFData { impl Scheduler for GEDFScheduler { fn wake_task(&self, task: Arc) { - let mut node = MCSNode::new(); - // The reason for acquiring this lock before invoke_preemption() is to prevent priority inversion from occurring - // when invoke_preemption() is executed between the time the next task is determined and the RUNNING is updated - // within the scheduler's get_next(). - let mut data = self.data.lock(&mut node); - let internal_data = data.get_or_insert_with(GEDFData::new); - let (wake_time, absolute_deadline) = { let mut node_inner = MCSNode::new(); let mut info = task.info.lock(&mut node_inner); @@ -85,7 +80,12 @@ impl Scheduler for GEDFScheduler { } }; + let mut node = MCSNode::new(); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { + let mut node_inner = MCSNode::new(); + let mut data = self.data.lock(&mut node_inner); + let internal_data = data.get_or_insert_with(GEDFData::new); internal_data.queue.push(GEDFTask { task: task.clone(), absolute_deadline, @@ -96,7 +96,10 @@ impl Scheduler for GEDFScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let mut data = self.data.lock(&mut node); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + + let mut node_inner = MCSNode::new(); + let mut data = self.data.lock(&mut node_inner); #[allow(clippy::question_mark)] let data = match data.as_mut() { diff --git a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs index 2f387249f..df6d9dfc2 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs @@ -3,7 +3,7 @@ use core::cmp::max; use super::{Scheduler, SchedulerType, Task}; -use crate::scheduler::{peek_preemption_pending, push_preemption_pending}; +use crate::scheduler::{peek_preemption_pending, push_preemption_pending, GLOBAL_WAKE_GET_MUTEX}; use crate::task::{get_task, get_tasks_running, set_current_task, set_need_preemption}; use crate::{scheduler::get_priority, task::State}; use alloc::sync::Arc; @@ -35,12 +35,6 @@ impl PrioritizedFIFOData { impl Scheduler for PrioritizedFIFOScheduler { fn wake_task(&self, task: Arc) { - let mut node = MCSNode::new(); - // The reason for acquiring this lock before invoke_preemption() is to prevent priority inversion from occurring - // when invoke_preemption() is executed between the time the next task is determined and the RUNNING is updated - // within the scheduler's get_next(). - let mut data = self.data.lock(&mut node); - let internal_data = data.get_or_insert_with(PrioritizedFIFOData::new); let priority = { let mut node_inner = MCSNode::new(); let info = task.info.lock(&mut node_inner); @@ -50,7 +44,12 @@ impl Scheduler for PrioritizedFIFOScheduler { } }; + let mut node = MCSNode::new(); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { + let mut node_inner = MCSNode::new(); + let mut data = self.data.lock(&mut node_inner); + let internal_data = data.get_or_insert_with(PrioritizedFIFOData::new); internal_data.queue.push( priority, PrioritizedFIFOTask { @@ -63,7 +62,10 @@ impl Scheduler for PrioritizedFIFOScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let mut data = self.data.lock(&mut node); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + + let mut node_inner = MCSNode::new(); + let mut data = self.data.lock(&mut node_inner); #[allow(clippy::question_mark)] let data = match data.as_mut() { diff --git a/awkernel_async_lib/src/scheduler/prioritized_rr.rs b/awkernel_async_lib/src/scheduler/prioritized_rr.rs index c109cea26..6b38758ef 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_rr.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_rr.rs @@ -4,7 +4,10 @@ use core::cmp::max; use super::{Scheduler, SchedulerType, Task}; use crate::{ - scheduler::{get_next_task, get_priority, peek_preemption_pending, push_preemption_pending}, + scheduler::{ + get_next_task, get_priority, peek_preemption_pending, push_preemption_pending, + GLOBAL_WAKE_GET_MUTEX, + }, task::{ get_last_executed_by_task_id, get_task, get_tasks_running, set_current_task, set_need_preemption, State, @@ -40,12 +43,6 @@ impl PrioritizedRRData { impl Scheduler for PrioritizedRRScheduler { fn wake_task(&self, task: Arc) { - let mut node = MCSNode::new(); - // The reason for acquiring this lock before invoke_preemption() is to prevent priority inversion from occurring - // when invoke_preemption() is executed between the time the next task is determined and the RUNNING is updated - // within the scheduler's get_next(). - let mut data = self.data.lock(&mut node); - let internal_data = data.get_or_insert_with(PrioritizedRRData::new); let priority = { let mut node_inner = MCSNode::new(); let info = task.info.lock(&mut node_inner); @@ -55,7 +52,12 @@ impl Scheduler for PrioritizedRRScheduler { } }; + let mut node = MCSNode::new(); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption_wake(task.clone()) { + let mut node_inner = MCSNode::new(); + let mut data = self.data.lock(&mut node_inner); + let internal_data = data.get_or_insert_with(PrioritizedRRData::new); internal_data.queue.push( priority, PrioritizedRRTask { @@ -68,7 +70,10 @@ impl Scheduler for PrioritizedRRScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let mut guard = self.data.lock(&mut node); + GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + + let mut node_inner = MCSNode::new(); + let mut guard = self.data.lock(&mut node_inner); #[allow(clippy::question_mark)] let data = match guard.as_mut() { diff --git a/specification/awkernel_async_lib/src/task/preemptive_spin/data_structure.pml b/specification/awkernel_async_lib/src/task/preemptive_spin/data_structure.pml index 122caedea..397e5ea72 100644 --- a/specification/awkernel_async_lib/src/task/preemptive_spin/data_structure.pml +++ b/specification/awkernel_async_lib/src/task/preemptive_spin/data_structure.pml @@ -35,5 +35,6 @@ chan queue[SCHEDULER_TYPE_NUM] = [TASK_NUM] of { byte }// task_ids in ascending #include "mutex.pml" Mutex lock_info[TASK_NUM] Mutex lock_queue[SCHEDULER_TYPE_NUM] +Mutex lock_global_wake_get_mutex #define BYTE_MAX 255 diff --git a/specification/awkernel_async_lib/src/task/preemptive_spin/for_verification.pml b/specification/awkernel_async_lib/src/task/preemptive_spin/for_verification.pml index 110de2ca1..dce3d2be4 100644 --- a/specification/awkernel_async_lib/src/task/preemptive_spin/for_verification.pml +++ b/specification/awkernel_async_lib/src/task/preemptive_spin/for_verification.pml @@ -34,7 +34,7 @@ inline update_running_lowest_priority() { } } -#define MAX_CONSECUTIVE_RUN_MAIN_LOOP 5 +#define MAX_CONSECUTIVE_RUN_MAIN_LOOP 2 byte consecutive_run_main_loop[WORKER_NUM] = 0 bool wait_for_weak_fairness[WORKER_NUM] = false chan resume_requests = [WORKER_NUM] of { byte }// tid that requested to resume execution. diff --git a/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml b/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml index 25a5344f5..273c46138 100644 --- a/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml +++ b/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml @@ -82,19 +82,21 @@ inline invoke_preemption(tid,task,ret) { /* awkernel_async_lib::scheduler::fifo::PrioritizedFIFOScheduler::wake_task()*/ inline wake_task(tid,task) { bool preemption_invoked; - lock(tid,lock_queue[tasks[task].scheduler_type]); + lock(tid,lock_global_wake_get_mutex); invoke_preemption(tid,task,preemption_invoked); if :: !preemption_invoked -> + lock(tid,lock_queue[tasks[task].scheduler_type]); d_step{ printf("wake_task(): push to queue: tid = %d,task = %d\n",tid,task); queue[tasks[task].scheduler_type]!!task } + unlock(tid,lock_queue[tasks[task].scheduler_type]); :: else fi - unlock(tid,lock_queue[tasks[task].scheduler_type]); + unlock(tid,lock_global_wake_get_mutex); d_step { assert(waking[task] > 0); waking[task]-- @@ -136,6 +138,7 @@ inline wake(tid,task) { /* awkernel_async_lib::scheduler::fifo::PrioritizedFIFOScheduler::get_next()*/ inline get_next_each_scheduler(tid,ret,sched_type) { + lock(tid,lock_global_wake_get_mutex); lock(tid,lock_queue[sched_type]); byte head; @@ -167,12 +170,13 @@ inline get_next_each_scheduler(tid,ret,sched_type) { } unlock(tid,lock_info[head]); - unlock(tid,lock_queue[sched_type]); ret = head :: else -> - unlock(tid,lock_queue[sched_type]); ret = - 1 fi + + unlock(tid,lock_queue[sched_type]); + unlock(tid,lock_global_wake_get_mutex); } /* awkernel_async_lib::task::scheduler::get_next_task() */ diff --git a/userland/Cargo.toml b/userland/Cargo.toml index 96e582732..664ec35f3 100644 --- a/userland/Cargo.toml +++ b/userland/Cargo.toml @@ -38,6 +38,10 @@ optional = true path = "../applications/tests/test_prioritized_fifo" optional = true +[dependencies.test_multi_prioritized_scheduler] +path = "../applications/tests/test_multi_prioritized_scheduler" +optional = true + [dependencies.test_prioritized_rr] path = "../applications/tests/test_prioritized_rr" optional = true diff --git a/userland/src/lib.rs b/userland/src/lib.rs index 60123208c..01a66fe18 100644 --- a/userland/src/lib.rs +++ b/userland/src/lib.rs @@ -25,6 +25,9 @@ pub async fn main() -> Result<(), Cow<'static, str>> { #[cfg(feature = "test_prioritized_fifo")] test_prioritized_fifo::run().await; // test for prioritized_fifo + #[cfg(feature = "test_multi_prioritized_scheduler")] + test_multi_prioritized_scheduler::run().await; // test for multi prioritized scheduler + #[cfg(feature = "test_prioritized_rr")] test_prioritized_rr::run().await; // test for prioritized_rr From 45da902f6d3f1311d4a1832685d38fc8b10fffbf Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Sep 2025 14:00:17 +0900 Subject: [PATCH 2/6] fix: clippy --- awkernel_async_lib/src/scheduler/gedf.rs | 4 ++-- awkernel_async_lib/src/scheduler/prioritized_fifo.rs | 4 ++-- awkernel_async_lib/src/scheduler/prioritized_rr.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index f6db80679..b8c85c792 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -81,7 +81,7 @@ impl Scheduler for GEDFScheduler { }; let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -96,7 +96,7 @@ impl Scheduler for GEDFScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); diff --git a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs index df6d9dfc2..08ac3e126 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs @@ -45,7 +45,7 @@ impl Scheduler for PrioritizedFIFOScheduler { }; let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -62,7 +62,7 @@ impl Scheduler for PrioritizedFIFOScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); diff --git a/awkernel_async_lib/src/scheduler/prioritized_rr.rs b/awkernel_async_lib/src/scheduler/prioritized_rr.rs index 6b38758ef..706e38e37 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_rr.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_rr.rs @@ -53,7 +53,7 @@ impl Scheduler for PrioritizedRRScheduler { }; let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption_wake(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -70,7 +70,7 @@ impl Scheduler for PrioritizedRRScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut guard = self.data.lock(&mut node_inner); From 5b53abdaff7f3996ddff7fe8cbaf0ce0d4eb5839 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Sep 2025 14:09:55 +0900 Subject: [PATCH 3/6] fix: clippy --- awkernel_async_lib/src/scheduler/gedf.rs | 4 ++-- awkernel_async_lib/src/scheduler/prioritized_fifo.rs | 4 ++-- awkernel_async_lib/src/scheduler/prioritized_rr.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index b8c85c792..4aa6536b7 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -81,7 +81,7 @@ impl Scheduler for GEDFScheduler { }; let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -96,7 +96,7 @@ impl Scheduler for GEDFScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); diff --git a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs index 08ac3e126..b8d9c92ac 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs @@ -45,7 +45,7 @@ impl Scheduler for PrioritizedFIFOScheduler { }; let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -62,7 +62,7 @@ impl Scheduler for PrioritizedFIFOScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); diff --git a/awkernel_async_lib/src/scheduler/prioritized_rr.rs b/awkernel_async_lib/src/scheduler/prioritized_rr.rs index 706e38e37..9da1e040f 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_rr.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_rr.rs @@ -53,7 +53,7 @@ impl Scheduler for PrioritizedRRScheduler { }; let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); if !self.invoke_preemption_wake(task.clone()) { let mut node_inner = MCSNode::new(); let mut data = self.data.lock(&mut node_inner); @@ -70,7 +70,7 @@ impl Scheduler for PrioritizedRRScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _ = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); let mut node_inner = MCSNode::new(); let mut guard = self.data.lock(&mut node_inner); From a536ecfa06e0225a3b688c9cf13a356249664eae Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Thu, 25 Sep 2025 04:53:34 +0900 Subject: [PATCH 4/6] fix: change mutex range --- .../tests/test_multi_prioritized_scheduler/src/lib.rs | 2 +- awkernel_async_lib/src/scheduler.rs | 3 +++ awkernel_async_lib/src/scheduler/gedf.rs | 5 +---- awkernel_async_lib/src/scheduler/prioritized_fifo.rs | 5 +---- awkernel_async_lib/src/scheduler/prioritized_rr.rs | 5 +---- .../src/task/preemptive_spin/preemptive.pml | 4 ++-- 6 files changed, 9 insertions(+), 15 deletions(-) diff --git a/applications/tests/test_multi_prioritized_scheduler/src/lib.rs b/applications/tests/test_multi_prioritized_scheduler/src/lib.rs index 3cea83890..349788884 100644 --- a/applications/tests/test_multi_prioritized_scheduler/src/lib.rs +++ b/applications/tests/test_multi_prioritized_scheduler/src/lib.rs @@ -20,7 +20,7 @@ pub async fn run() { "low_priority".into(), async move { log::debug!("low priority task {i} started. sched_type = {sched_type:?}"); - wait_millisec(500); + wait_millisec(1000); log::debug!("low priority task {i} finished. sched_type = {sched_type:?}"); }, sched_type, diff --git a/awkernel_async_lib/src/scheduler.rs b/awkernel_async_lib/src/scheduler.rs index 61a784678..9371dd7ea 100644 --- a/awkernel_async_lib/src/scheduler.rs +++ b/awkernel_async_lib/src/scheduler.rs @@ -139,6 +139,9 @@ pub(crate) trait Scheduler { /// Get the next executable task. #[inline] pub(crate) fn get_next_task(execution_ensured: bool) -> Option> { + let mut node = MCSNode::new(); + let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); + let task = PRIORITY_LIST .iter() .find_map(|&scheduler_type| get_scheduler(scheduler_type).get_next(execution_ensured)); diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index 4aa6536b7..ce7b11739 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -96,10 +96,7 @@ impl Scheduler for GEDFScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); - - let mut node_inner = MCSNode::new(); - let mut data = self.data.lock(&mut node_inner); + let mut data = self.data.lock(&mut node); #[allow(clippy::question_mark)] let data = match data.as_mut() { diff --git a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs index b8d9c92ac..da125cf3b 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_fifo.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_fifo.rs @@ -62,10 +62,7 @@ impl Scheduler for PrioritizedFIFOScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); - - let mut node_inner = MCSNode::new(); - let mut data = self.data.lock(&mut node_inner); + let mut data = self.data.lock(&mut node); #[allow(clippy::question_mark)] let data = match data.as_mut() { diff --git a/awkernel_async_lib/src/scheduler/prioritized_rr.rs b/awkernel_async_lib/src/scheduler/prioritized_rr.rs index 9da1e040f..d06b3fe63 100644 --- a/awkernel_async_lib/src/scheduler/prioritized_rr.rs +++ b/awkernel_async_lib/src/scheduler/prioritized_rr.rs @@ -70,10 +70,7 @@ impl Scheduler for PrioritizedRRScheduler { fn get_next(&self, execution_ensured: bool) -> Option> { let mut node = MCSNode::new(); - let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node); - - let mut node_inner = MCSNode::new(); - let mut guard = self.data.lock(&mut node_inner); + let mut guard = self.data.lock(&mut node); #[allow(clippy::question_mark)] let data = match guard.as_mut() { diff --git a/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml b/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml index 273c46138..af53bd98f 100644 --- a/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml +++ b/specification/awkernel_async_lib/src/task/preemptive_spin/preemptive.pml @@ -138,7 +138,6 @@ inline wake(tid,task) { /* awkernel_async_lib::scheduler::fifo::PrioritizedFIFOScheduler::get_next()*/ inline get_next_each_scheduler(tid,ret,sched_type) { - lock(tid,lock_global_wake_get_mutex); lock(tid,lock_queue[sched_type]); byte head; @@ -176,11 +175,11 @@ inline get_next_each_scheduler(tid,ret,sched_type) { fi unlock(tid,lock_queue[sched_type]); - unlock(tid,lock_global_wake_get_mutex); } /* awkernel_async_lib::task::scheduler::get_next_task() */ inline scheduler_get_next(tid,ret) { + lock(tid,lock_global_wake_get_mutex); byte sched_i; for (sched_i : 0 .. SCHEDULER_TYPE_NUM - 1) { get_next_each_scheduler(tid,ret,sched_i); @@ -189,6 +188,7 @@ inline scheduler_get_next(tid,ret) { :: else fi } + unlock(tid,lock_global_wake_get_mutex); } /* awkernel_async_lib::task::preempt::get_next_task()*/ From 80468f8ee28d6b7ad57df6ce4a278e50127e4f41 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Thu, 25 Sep 2025 05:09:42 +0900 Subject: [PATCH 5/6] docs: update README --- .../src/task/preemptive_spin/README.md | 585 +++++++++++++++++- 1 file changed, 584 insertions(+), 1 deletion(-) diff --git a/specification/awkernel_async_lib/src/task/preemptive_spin/README.md b/specification/awkernel_async_lib/src/task/preemptive_spin/README.md index d4d2171de..6a96dc29f 100644 --- a/specification/awkernel_async_lib/src/task/preemptive_spin/README.md +++ b/specification/awkernel_async_lib/src/task/preemptive_spin/README.md @@ -185,7 +185,7 @@ To keep the model less complex, please note the following abstractions compared - For priority comparison, we use the simple priority value (`TaskInfo.id`) as before, rather than `combine_priority`. - This is ensured by guaranteeing that tasks with larger ids are never assigned to scheduler types with higher priority. -### Results +### First Results (The errors are found) [Used Version](https://github.com/tier4/awkernel/commit/8ee41ca90485ed3360e4e4c0345d9882833be7cb) @@ -306,3 +306,586 @@ spin: _spin_nvr.tmp:21, Error: assertion violated spin: text of failed assertion: assert(!(!((!(((((((((((((((waking[0]==0)&&(waking[1]==0))&&(waking[2]==0))&&(waking[3]==0))&&(len(ipi_requests[0])==0))&&(RUNNING[0]!=-(1)))&&(RUNNING[0]!=runnable_preempted_highest_priority))&&(len(ipi_requests[1])==0))&&(RUNNING[1]!=-(1)))&&(RUNNING[1]!=runnable_preempted_highest_priority))&&!(handling_interrupt[0]))&&!(handling_interrupt[1]))&&!(handling_interrupt[2]))&&!(handling_interrupt[3])))||(running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority ((num_terminated==4)) +ltl eventually_prerequisites: [] (<> (((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((len(ipi_requests[1])==0))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) +ltl ensure_priority: [] ((! (((((((((((((((waking[0]==0)) && ((waking[1]==0))) && ((waking[2]==0))) && ((waking[3]==0))) && ((len(ipi_requests[0])==0))) && ((RUNNING[0]!=-(1)))) && ((RUNNING[0]!=runnable_preempted_highest_priority))) && ((len(ipi_requests[1])==0))) && ((RUNNING[1]!=-(1)))) && ((RUNNING[1]!=runnable_preempted_highest_priority))) && (! (handling_interrupt[0]))) && (! (handling_interrupt[1]))) && (! (handling_interrupt[2]))) && (! (handling_interrupt[3])))) || ((running_lowest_priority Date: Thu, 25 Sep 2025 20:29:05 +0900 Subject: [PATCH 6/6] feat(dag): implement GEDF (#653) * test: for develop #1 Signed-off-by: nokosaaan * test: for develop #2 Signed-off-by: nokosaaan * test: whether source_nodes or others & modify DAG struct #3 Signed-off-by: nokosaaan * test: add else process for initial period #4 Signed-off-by: nokosaaan * test: absolute deadline setting #5 Signed-off-by: nokosaaan * fix: absolute deadline setting #6 Signed-off-by: nokosaaan * fix absolute deadline #7 Signed-off-by: nokosaaan * test: change gedf.rs #8 Signed-off-by: nokosaaan * test: change gedf.rs #9 Signed-off-by: nokosaaan * fix: delete unnecessary code Signed-off-by: nokosaaan * fix: modify get_node_info and etc Signed-off-by: nokosaaan * fix: change function name Signed-off-by: nokosaaan * fix: organizing comments & modify else handling Signed-off-by: nokosaaan * fix: remove unused methods Signed-off-by: nokosaaan * fix: change PRIORITY_LIST & change dag.rs Signed-off-by: nokosaaan * fix: change gedf.rs & task.rs Signed-off-by: nokosaaan * fix: avoid deadlock for GEDFNoArg Signed-off-by: nokosaaan * fix: change spawn logic Signed-off-by: nokosaaan * fix: delete GEDFNoArg Signed-off-by: nokosaaan * fix: check cargo fmt Signed-off-by: nokosaaan * fix: adjusting the number of arguments Signed-off-by: nokosaaan * fix: check cargo fmt 2 Signed-off-by: nokosaaan * fix: improper variable usage in panic! macros Signed-off-by: nokosaaan * fix: check doctest Signed-off-by: nokosaaan * fix: check doctest 2 Signed-off-by: nokosaaan * fix: check doctest 3 Signed-off-by: nokosaaan * fix: check doctest 4 Signed-off-by: nokosaaan * fix: check doctest 5 Signed-off-by: nokosaaan * fix: check doctest 6 Signed-off-by: nokosaaan * fix: check doctest 7 Signed-off-by: nokosaaan * fix: check doctest 8 Signed-off-by: nokosaaan * fix: check cargo fmt 3 Signed-off-by: nokosaaan * fix: check doctest 9 Signed-off-by: nokosaaan * fix: check doctest 10 Signed-off-by: nokosaaan * fix: check doctest 11 Signed-off-by: nokosaaan * fix: fix unnecessary imports Signed-off-by: nokosaaan * fix: reset the LOG_ENABLE flag to its original state Signed-off-by: nokosaaan * fix: delete unnecessary functions Signed-off-by: nokosaaan * fix: delete unnecessary functions 2 Signed-off-by: nokosaaan * fix: delete unnecessary functions 3 Signed-off-by: nokosaaan * fix: implement gedf Signed-off-by: nokosaaan * fix: check cargo fmt 4 Signed-off-by: nokosaaan * fix: refactored the nested sections using helper functions Signed-off-by: nokosaaan * fix: reduced nested structure using early return Signed-off-by: nokosaaan * fix: adding a function and code optimization Signed-off-by: nokosaaan * fix: check cargo fmt 5 Signed-off-by: nokosaaan * fix: add a comment in dag.rs Signed-off-by: nokosaaan * fix: format! error Signed-off-by: nokosaaan * fix: To resolve the build error, I used unwrap_or_else instead of expect. Signed-off-by: nokosaaan * fix: delete unnecessary import Signed-off-by: nokosaaan * fix: delete semicolon Signed-off-by: nokosaaan * fix: panic! 2 Signed-off-by: nokosaaan * fix: unify the name index to id Signed-off-by: nokosaaan * fix: consolidated the duplicate processing and add a comment Signed-off-by: nokosaaan * fix: solve let binding Signed-off-by: nokosaaan --------- Signed-off-by: nokosaaan --- applications/tests/test_dag/src/lib.rs | 12 ++-- awkernel_async_lib/src/dag.rs | 82 ++++++++++++++++++++++-- awkernel_async_lib/src/scheduler/gedf.rs | 50 +++++++++++++-- awkernel_async_lib/src/task.rs | 54 +++++++++++++++- 4 files changed, 181 insertions(+), 17 deletions(-) diff --git a/applications/tests/test_dag/src/lib.rs b/applications/tests/test_dag/src/lib.rs index 7e799f694..a7330457f 100644 --- a/applications/tests/test_dag/src/lib.rs +++ b/applications/tests/test_dag/src/lib.rs @@ -7,7 +7,7 @@ use awkernel_async_lib::scheduler::SchedulerType; use awkernel_lib::delay::wait_microsec; use core::time::Duration; -const LOG_ENABLE: bool = false; +const LOG_ENABLE: bool = true; pub async fn run() { wait_microsec(1000000); @@ -30,7 +30,7 @@ pub async fn run() { (number,) }, vec![Cow::from("topic0")], - SchedulerType::PrioritizedFIFO(30), + SchedulerType::GEDF(5), period, ) .await; @@ -47,7 +47,7 @@ pub async fn run() { }, vec![Cow::from("topic0")], vec![Cow::from("topic1"), Cow::from("topic2")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -62,7 +62,7 @@ pub async fn run() { }, vec![Cow::from("topic1")], vec![Cow::from("topic3")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -77,7 +77,7 @@ pub async fn run() { }, vec![Cow::from("topic2")], vec![Cow::from("topic4")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -90,7 +90,7 @@ pub async fn run() { } }, vec![Cow::from("topic3"), Cow::from("topic4")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), Duration::from_secs(1), ) .await; diff --git a/awkernel_async_lib/src/dag.rs b/awkernel_async_lib/src/dag.rs index 2cf389d02..fa150a48f 100644 --- a/awkernel_async_lib/src/dag.rs +++ b/awkernel_async_lib/src/dag.rs @@ -63,6 +63,7 @@ use crate::{ visit::{EdgeRef, IntoNodeReferences, NodeRef}, }, scheduler::SchedulerType, + task::DagInfo, time_interval::interval, Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers, }; @@ -168,6 +169,7 @@ impl core::fmt::Display for DagError { pub struct Dag { id: u32, graph: Mutex>, + absolute_deadline: Mutex>, #[cfg(feature = "perf")] response_info: Mutex, @@ -190,6 +192,25 @@ impl Dag { graph.externals(Direction::Outgoing).collect() } + pub fn is_source_node(&self, node_index: NodeIndex) -> bool { + let source_nodes = self.get_source_nodes(); + source_nodes.contains(&node_index) + } + + // Returns the relative deadline of the first sink node, if it exists. + pub fn get_sink_relative_deadline(&self) -> Option { + let sink_nodes: Vec = self.get_sink_nodes(); + + if let Some(sink_node_index) = sink_nodes.first() { + self.graph + .lock(&mut MCSNode::new()) + .node_weight(*sink_node_index)? + .relative_deadline + } else { + None + } + } + fn set_relative_deadline(&self, node_idx: NodeIndex, deadline: Duration) { let mut node = MCSNode::new(); let mut graph = self.graph.lock(&mut node); @@ -284,6 +305,10 @@ impl Dag { self.check_subscribe_mismatch::(&subscribe_topic_names, node_idx); self.check_publish_mismatch::(&publish_topic_names, node_idx); + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut pending_tasks = PENDING_TASKS.lock(&mut node); pending_tasks @@ -297,6 +322,7 @@ impl Dag { subscribe_topic_names, publish_topic_names, sched_type, + DagInfo { dag_id, node_id }, ) .await }) @@ -330,6 +356,10 @@ impl Dag { } }; + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut source_pending_tasks = SOURCE_PENDING_TASKS.lock(&mut node); source_pending_tasks.insert( @@ -343,6 +373,7 @@ impl Dag { sched_type, period, measure_f, + DagInfo { dag_id, node_id }, ) .await }) @@ -384,6 +415,10 @@ impl Dag { } }; + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut pending_tasks = PENDING_TASKS.lock(&mut node); pending_tasks @@ -396,6 +431,7 @@ impl Dag { measure_f, subscribe_topic_names, sched_type, + DagInfo { dag_id, node_id }, ) .await }) @@ -437,6 +473,20 @@ impl Dag { .push(node_idx.index()); } } + + #[inline(always)] + pub fn set_absolute_deadline(&self, deadline: u64) { + let mut node = MCSNode::new(); + let mut absolute_deadline = self.absolute_deadline.lock(&mut node); + *absolute_deadline = Some(deadline); + } + + #[inline(always)] + pub fn get_absolute_deadline(&self) -> Option { + let mut node = MCSNode::new(); + let absolute_deadline = self.absolute_deadline.lock(&mut node); + *absolute_deadline + } } struct PendingTask { @@ -463,6 +513,10 @@ struct NodeInfo { relative_deadline: Option, } +pub fn to_node_index(index: u32) -> NodeIndex { + NodeIndex::new(index as usize) +} + struct EdgeInfo { topic_name: Cow<'static, str>, } @@ -489,6 +543,7 @@ impl Dags { let dag = Arc::new(Dag { id, graph: Mutex::new(graph::Graph::new()), + absolute_deadline: Mutex::new(None), #[cfg(feature = "perf")] response_info: Mutex::new(ResponseInfo::new()), @@ -525,6 +580,21 @@ pub fn get_dag(id: u32) -> Option> { dags.id_to_dag.get(&id).cloned() } +#[inline(always)] +pub fn get_dag_absolute_deadline(dag_id: u32) -> Option { + get_dag(dag_id)?.get_absolute_deadline() +} + +#[inline(always)] +pub fn set_dag_absolute_deadline(dag_id: u32, deadline: u64) -> bool { + if let Some(dag) = get_dag(dag_id) { + dag.set_absolute_deadline(deadline); + true + } else { + false + } +} + pub async fn finish_create_dags(dags: &[Arc]) -> Result<(), Vec> { match validate_all_rules(dags) { Ok(()) => { @@ -833,6 +903,7 @@ async fn spawn_reactor( subscribe_topic_names: Vec>, publish_topic_names: Vec>, sched_type: SchedulerType, + dag_info: DagInfo, ) -> u32 where F: Fn( @@ -862,7 +933,7 @@ where } }; - crate::task::spawn(reactor_name, future, sched_type) + crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info) } async fn spawn_periodic_reactor( @@ -872,6 +943,7 @@ async fn spawn_periodic_reactor( sched_type: SchedulerType, period: Duration, _release_measure: Option, + dag_info: DagInfo, ) -> u32 where F: Fn() -> ::Item + Send + 'static, @@ -909,7 +981,7 @@ where } }; - let task_id = crate::task::spawn(reactor_name, future, sched_type); + let task_id = crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info); #[cfg(feature = "perf")] release_measure(); @@ -922,6 +994,7 @@ async fn spawn_sink_reactor( f: F, subscribe_topic_names: Vec>, sched_type: SchedulerType, + dag_info: DagInfo, ) -> u32 where F: Fn(::Item) + Send + 'static, @@ -933,11 +1006,10 @@ where Args::create_subscribers(subscribe_topic_names, Attribute::default()); loop { - let args: <::Subscribers as MultipleReceiver>::Item = - subscribers.recv_all().await; + let args: ::Item = subscribers.recv_all().await; f(args); } }; - crate::task::spawn(reactor_name, future, sched_type) + crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info) } diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index ce7b11739..09ec0d4f3 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -4,11 +4,11 @@ use core::cmp::max; use super::{Scheduler, SchedulerType, Task}; use crate::{ - scheduler::{ - get_priority, peek_preemption_pending, push_preemption_pending, GLOBAL_WAKE_GET_MUTEX, - }, + dag::{get_dag, get_dag_absolute_deadline, set_dag_absolute_deadline, to_node_index}, + scheduler::GLOBAL_WAKE_GET_MUTEX, + scheduler::{get_priority, peek_preemption_pending, push_preemption_pending}, task::{ - get_task, get_tasks_running, set_current_task, set_need_preemption, State, + get_task, get_tasks_running, set_current_task, set_need_preemption, DagInfo, State, MAX_TASK_PRIORITY, }, }; @@ -66,10 +66,18 @@ impl Scheduler for GEDFScheduler { let (wake_time, absolute_deadline) = { let mut node_inner = MCSNode::new(); let mut info = task.info.lock(&mut node_inner); + let dag_info = info.get_dag_info(); match info.scheduler_type { SchedulerType::GEDF(relative_deadline) => { let wake_time = awkernel_lib::delay::uptime(); - let absolute_deadline = wake_time + relative_deadline; + let absolute_deadline = if let Some(ref dag_info) = dag_info { + calculate_and_update_dag_deadline(dag_info, wake_time) + } else { + // If dag_info is not present, the task is treated as a regular task, and + // the absolute_deadline is calculated using the scheduler's relative_deadline. + wake_time + relative_deadline + }; + task.priority .update_priority_info(self.priority, MAX_TASK_PRIORITY - absolute_deadline); info.update_absolute_deadline(absolute_deadline); @@ -184,3 +192,35 @@ impl GEDFScheduler { false } } + +fn get_dag_sink_relative_deadline_ms(dag_id: u32) -> u64 { + let dag = get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found")); + dag.get_sink_relative_deadline() + .map(|deadline| deadline.as_millis() as u64) + .unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} has no sink relative deadline set")) +} + +fn calculate_and_set_dag_deadline(dag_id: u32, wake_time: u64) -> u64 { + let relative_deadline_ms = get_dag_sink_relative_deadline_ms(dag_id); + let dag_absolute_deadline = wake_time + relative_deadline_ms; + set_dag_absolute_deadline(dag_id, dag_absolute_deadline); + dag_absolute_deadline +} + +pub fn calculate_and_update_dag_deadline(dag_info: &DagInfo, wake_time: u64) -> u64 { + let dag_id = dag_info.dag_id; + let node_id = dag_info.node_id; + + if let Some(absolute_deadline) = get_dag_absolute_deadline(dag_id) { + let dag = + get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found")); + let current_node_index = to_node_index(node_id); + if !dag.is_source_node(current_node_index) { + return absolute_deadline; + } + + return calculate_and_set_dag_deadline(dag_id, wake_time); + } + + calculate_and_set_dag_deadline(dag_id, wake_time) +} diff --git a/awkernel_async_lib/src/task.rs b/awkernel_async_lib/src/task.rs index e25fb110e..6e4ee4636 100644 --- a/awkernel_async_lib/src/task.rs +++ b/awkernel_async_lib/src/task.rs @@ -157,6 +157,7 @@ pub struct TaskInfo { need_sched: bool, pub(crate) need_preemption: bool, panicked: bool, + pub(crate) dag_info: Option, #[cfg(not(feature = "no_preempt"))] thread: Option, @@ -219,6 +220,11 @@ impl TaskInfo { pub fn panicked(&self) -> bool { self.panicked } + + #[inline(always)] + pub fn get_dag_info(&self) -> Option { + self.dag_info.clone() + } } /// State of task. @@ -240,6 +246,12 @@ struct Tasks { id_to_task: BTreeMap>, } +#[derive(Clone)] +pub struct DagInfo { + pub dag_id: u32, + pub node_id: u32, +} + impl Tasks { const fn new() -> Self { Self { @@ -254,6 +266,7 @@ impl Tasks { future: Fuse>, scheduler: &'static dyn Scheduler, scheduler_type: SchedulerType, + dag_info: Option, ) -> u32 { let mut id = self.candidate_id; loop { @@ -272,6 +285,7 @@ impl Tasks { need_sched: false, need_preemption: false, panicked: false, + dag_info, #[cfg(not(feature = "no_preempt"))] thread: None, @@ -337,6 +351,44 @@ pub fn spawn( name: Cow<'static, str>, future: impl Future + 'static + Send, sched_type: SchedulerType, +) -> u32 { + inner_spawn(name, future, sched_type, None) +} + +/// Spawn a detached task with DAG information. +/// This function is similar to `spawn` but automatically sets DAG information +/// for the task, which is useful for DAG-based schedulers like GEDF. +/// +/// # Example +/// +/// ```ignore +/// use awkernel_async_lib::{scheduler::SchedulerType, task, dag::{create_dag, add_node_with_topic_edges_public, set_relative_deadline_public}}; +/// use core::time::Duration; +/// let dag = create_dag(); +/// let sink_node_idx = add_node_with_topic_edges_public(&dag, &[], &[]); +/// let deadline = Duration::from_millis(100); +/// set_relative_deadline_public(&dag, sink_node_idx, deadline); +/// let task_id = task::spawn_with_dag_info( +/// "dag task".into(), +/// async { Ok(()) }, +/// SchedulerType::GEDF(0), +/// DagInfo { dag_id: 1, node_id: 0 } +/// ); +/// ``` +pub fn spawn_with_dag_info( + name: Cow<'static, str>, + future: impl Future + 'static + Send, + sched_type: SchedulerType, + dag_info: DagInfo, +) -> u32 { + inner_spawn(name, future, sched_type, Some(dag_info)) +} + +pub fn inner_spawn( + name: Cow<'static, str>, + future: impl Future + 'static + Send, + sched_type: SchedulerType, + dag_info: Option, ) -> u32 { if let SchedulerType::PrioritizedFIFO(p) | SchedulerType::PrioritizedRR(p) = sched_type { if p > HIGHEST_PRIORITY { @@ -352,7 +404,7 @@ pub fn spawn( let mut node = MCSNode::new(); let mut tasks = TASKS.lock(&mut node); - let id = tasks.spawn(name, future.fuse(), scheduler, sched_type); + let id = tasks.spawn(name, future.fuse(), scheduler, sched_type, dag_info); let task = tasks.id_to_task.get(&id).cloned(); drop(tasks);