Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions applications/tests/test_sched_preempt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "test_sched_preempt"
version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4"

[dependencies.awkernel_async_lib]
path = "../../../awkernel_async_lib"
default-features = false

[dependencies.awkernel_lib]
path = "../../../awkernel_lib"
default-features = false
71 changes: 71 additions & 0 deletions applications/tests/test_sched_preempt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#![no_std]

use awkernel_async_lib::{scheduler::SchedulerType, spawn, task::get_lowest_priority_task_info};
use awkernel_lib::{
cpu::cpu_id,
delay::{uptime, wait_microsec},
};

#[allow(dead_code)]
enum TestType {
GetlowestTask,
}

/// Tests related to preemption between schedulers
pub async fn run() {
// TASK ID:1
wait_microsec(100000);
log::info!(
"[{}] Start test_sched_preempt at cpu_id: {}",
uptime(),
cpu_id()
);

let test_type = TestType::GetlowestTask;
match test_type {
TestType::GetlowestTask => check_lowest_task().await,
}

log::info!(
"[{}] End test_sched_preempt at cpu_id: {}",
uptime(),
cpu_id()
);
}

async fn check_lowest_task() {
spawn(
// TASK ID:8
"GEDF".into(),
async move {
loop {
log::info!("[{}] GEDF is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] GEDF is end at cpu_id: {}", uptime(), cpu_id());
}
},
SchedulerType::GEDF(10000),
)
.await;
spawn(
// TASK ID:9
"FIFO".into(),
async move {
loop {
log::info!("[{}] FIFO is start at cpu_id: {}", uptime(), cpu_id());
wait_microsec(10000000);
log::info!("[{}] FIFO is end at cpu_id: {}", uptime(), cpu_id());
}
},
SchedulerType::FIFO,
)
.await;

wait_microsec(1000000);

// FIFO TASK ID:1 < FIFO TASK ID:9 < GEDF TASK ID:8
// In FIFO tasks, the task with the smaller CPU ID has the lowest priority.
if let Some((task_id, cpu_id, _)) = get_lowest_priority_task_info() {
log::debug!("Task ID: {}, cpu_id: {}", task_id, cpu_id,);
}
}
2 changes: 2 additions & 0 deletions awkernel_async_lib/src/scheduler/gedf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl Scheduler for GEDFScheduler {
let wake_time = awkernel_lib::delay::uptime();
let absolute_deadline = wake_time + relative_deadline;

task.priority
.update_priority_info(self.priority, absolute_deadline);
info.update_absolute_deadline(absolute_deadline);

data.queue.push(GEDFTask {
Expand Down
102 changes: 101 additions & 1 deletion awkernel_async_lib/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use awkernel_lib::{
unwind::catch_unwind,
};
use core::{
sync::atomic::{AtomicU32, Ordering},
sync::atomic::{AtomicU32, AtomicU64, Ordering},
task::{Context, Poll},
};
use futures::{
Expand All @@ -48,6 +48,7 @@ pub type TaskResult = Result<(), Cow<'static, str>>;

static TASKS: Mutex<Tasks> = Mutex::new(Tasks::new()); // Set of tasks.
static RUNNING: [AtomicU32; NUM_MAX_CPU] = array![_ => AtomicU32::new(0); NUM_MAX_CPU]; // IDs of running tasks.
static MAX_TASK_PRIORITY: u64 = (1 << 56) - 1; // Maximum task priority.

/// Task has ID, future, information, and a reference to a scheduler.
pub struct Task {
Expand All @@ -56,6 +57,7 @@ pub struct Task {
future: Mutex<Fuse<BoxFuture<'static, TaskResult>>>,
pub info: Mutex<TaskInfo>,
scheduler: &'static dyn Scheduler,
pub priority: PriorityInfo,
}

impl Task {
Expand Down Expand Up @@ -237,12 +239,21 @@ impl Tasks {
thread: None,
});

// Set the task priority.
// If the scheduler implements dynamic priority scheduling, the task priority will be updated later.
let task_priority = match scheduler_type {
SchedulerType::PrioritizedFIFO(priority)
| SchedulerType::PriorityBasedRR(priority) => priority as u64,
_ => MAX_TASK_PRIORITY,
};

let task = Task {
name,
future: Mutex::new(future),
scheduler,
id,
info,
priority: PriorityInfo::new(scheduler.priority(), task_priority),
};

e.insert(Arc::new(task));
Expand Down Expand Up @@ -1011,3 +1022,92 @@ pub fn panicking() {
preempt::preemption();
}
}

pub struct PriorityInfo {
pub priority: AtomicU64,
}

impl PriorityInfo {
fn new(scheduler_priority: u8, task_priority: u64) -> Self {
PriorityInfo {
priority: AtomicU64::new(Self::combine_priority(scheduler_priority, task_priority)),
}
}

pub fn update_priority_info(&self, scheduler_priority: u8, task_priority: u64) {
self.priority.store(
Self::combine_priority(scheduler_priority, task_priority),
Ordering::Relaxed,
);
}

fn combine_priority(scheduler_priority: u8, task_priority: u64) -> u64 {
assert!(task_priority < (1 << 56), "Task priority exceeds 56 bits");
((scheduler_priority as u64) << 56) | (task_priority & ((1 << 56) - 1))
}
}

impl Clone for PriorityInfo {
fn clone(&self) -> Self {
let value = self.priority.load(Ordering::Relaxed);
PriorityInfo {
priority: AtomicU64::new(value),
}
}
}

impl PartialEq for PriorityInfo {
fn eq(&self, other: &Self) -> bool {
self.priority.load(Ordering::Relaxed) == other.priority.load(Ordering::Relaxed)
}
}

impl Eq for PriorityInfo {}

impl PartialOrd for PriorityInfo {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for PriorityInfo {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.priority
.load(Ordering::Relaxed)
.cmp(&other.priority.load(Ordering::Relaxed))
}
}

pub fn get_lowest_priority_task_info() -> Option<(u32, usize, PriorityInfo)> {
let mut lowest_task: Option<(u32, usize, PriorityInfo)> = None; // (task_id, cpu_id, priority_info)

let running_tasks: Vec<RunningTask> = get_tasks_running()
.into_iter()
.filter(|task| task.task_id != 0)
.collect();

let priority_infos: Vec<(u32, usize, PriorityInfo)> = {
let mut node = MCSNode::new();
let tasks = TASKS.lock(&mut node);
running_tasks
.into_iter()
.filter_map(|task| {
tasks
.id_to_task
.get(&task.task_id)
.map(|task_data| (task.task_id, task.cpu_id, task_data.priority.clone()))
})
.collect()
};

for (task_id, cpu_id, priority_info) in priority_infos {
if lowest_task
.as_ref()
.is_none_or(|(_, _, lowest_priority_info)| priority_info > *lowest_priority_info)
{
lowest_task = Some((task_id, cpu_id, priority_info));
}
}

lowest_task
}
7 changes: 6 additions & 1 deletion userland/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ optional = true
path = "../applications/tests/test_load_udp"
optional = true

[dependencies.test_sched_preempt]
path = "../applications/tests/test_sched_preempt"
optional = true

[features]
default = ["test_graphics"]
default = ["test_sched_preempt"]
perf = ["awkernel_services/perf"]

# Test applications
Expand All @@ -79,3 +83,4 @@ test_gedf = ["dep:test_gedf"]
test_load_udp = ["dep:test_load_udp"]
test_measure_channel = ["dep:test_measure_channel"]
test_measure_channel_heavy = ["dep:test_measure_channel_heavy"]
test_sched_preempt = ["dep:test_sched_preempt"]
3 changes: 3 additions & 0 deletions userland/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,8 @@ pub async fn main() -> Result<(), Cow<'static, str>> {
#[cfg(feature = "test_load_udp")]
load_test_udp::run().await; // load test udp

#[cfg(feature = "test_sched_preempt")]
test_sched_preempt::run().await; // tests related to preemption between schedulers

Ok(())
}
Loading