diff --git a/applications/tests/test_dag/src/lib.rs b/applications/tests/test_dag/src/lib.rs index 7e799f694..a7330457f 100644 --- a/applications/tests/test_dag/src/lib.rs +++ b/applications/tests/test_dag/src/lib.rs @@ -7,7 +7,7 @@ use awkernel_async_lib::scheduler::SchedulerType; use awkernel_lib::delay::wait_microsec; use core::time::Duration; -const LOG_ENABLE: bool = false; +const LOG_ENABLE: bool = true; pub async fn run() { wait_microsec(1000000); @@ -30,7 +30,7 @@ pub async fn run() { (number,) }, vec![Cow::from("topic0")], - SchedulerType::PrioritizedFIFO(30), + SchedulerType::GEDF(5), period, ) .await; @@ -47,7 +47,7 @@ pub async fn run() { }, vec![Cow::from("topic0")], vec![Cow::from("topic1"), Cow::from("topic2")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -62,7 +62,7 @@ pub async fn run() { }, vec![Cow::from("topic1")], vec![Cow::from("topic3")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -77,7 +77,7 @@ pub async fn run() { }, vec![Cow::from("topic2")], vec![Cow::from("topic4")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), ) .await; @@ -90,7 +90,7 @@ pub async fn run() { } }, vec![Cow::from("topic3"), Cow::from("topic4")], - SchedulerType::PrioritizedFIFO(0), + SchedulerType::GEDF(10), Duration::from_secs(1), ) .await; diff --git a/awkernel_async_lib/src/dag.rs b/awkernel_async_lib/src/dag.rs index 2cf389d02..fa150a48f 100644 --- a/awkernel_async_lib/src/dag.rs +++ b/awkernel_async_lib/src/dag.rs @@ -63,6 +63,7 @@ use crate::{ visit::{EdgeRef, IntoNodeReferences, NodeRef}, }, scheduler::SchedulerType, + task::DagInfo, time_interval::interval, Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers, }; @@ -168,6 +169,7 @@ impl core::fmt::Display for DagError { pub struct Dag { id: u32, graph: Mutex>, + absolute_deadline: Mutex>, #[cfg(feature = "perf")] response_info: Mutex, @@ -190,6 +192,25 @@ impl Dag { graph.externals(Direction::Outgoing).collect() } + pub fn is_source_node(&self, node_index: NodeIndex) -> bool { + let source_nodes = self.get_source_nodes(); + source_nodes.contains(&node_index) + } + + // Returns the relative deadline of the first sink node, if it exists. + pub fn get_sink_relative_deadline(&self) -> Option { + let sink_nodes: Vec = self.get_sink_nodes(); + + if let Some(sink_node_index) = sink_nodes.first() { + self.graph + .lock(&mut MCSNode::new()) + .node_weight(*sink_node_index)? + .relative_deadline + } else { + None + } + } + fn set_relative_deadline(&self, node_idx: NodeIndex, deadline: Duration) { let mut node = MCSNode::new(); let mut graph = self.graph.lock(&mut node); @@ -284,6 +305,10 @@ impl Dag { self.check_subscribe_mismatch::(&subscribe_topic_names, node_idx); self.check_publish_mismatch::(&publish_topic_names, node_idx); + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut pending_tasks = PENDING_TASKS.lock(&mut node); pending_tasks @@ -297,6 +322,7 @@ impl Dag { subscribe_topic_names, publish_topic_names, sched_type, + DagInfo { dag_id, node_id }, ) .await }) @@ -330,6 +356,10 @@ impl Dag { } }; + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut source_pending_tasks = SOURCE_PENDING_TASKS.lock(&mut node); source_pending_tasks.insert( @@ -343,6 +373,7 @@ impl Dag { sched_type, period, measure_f, + DagInfo { dag_id, node_id }, ) .await }) @@ -384,6 +415,10 @@ impl Dag { } }; + // To prevent errors caused by ownership moves + let dag_id = self.id; + let node_id = node_idx.index() as u32; + let mut node = MCSNode::new(); let mut pending_tasks = PENDING_TASKS.lock(&mut node); pending_tasks @@ -396,6 +431,7 @@ impl Dag { measure_f, subscribe_topic_names, sched_type, + DagInfo { dag_id, node_id }, ) .await }) @@ -437,6 +473,20 @@ impl Dag { .push(node_idx.index()); } } + + #[inline(always)] + pub fn set_absolute_deadline(&self, deadline: u64) { + let mut node = MCSNode::new(); + let mut absolute_deadline = self.absolute_deadline.lock(&mut node); + *absolute_deadline = Some(deadline); + } + + #[inline(always)] + pub fn get_absolute_deadline(&self) -> Option { + let mut node = MCSNode::new(); + let absolute_deadline = self.absolute_deadline.lock(&mut node); + *absolute_deadline + } } struct PendingTask { @@ -463,6 +513,10 @@ struct NodeInfo { relative_deadline: Option, } +pub fn to_node_index(index: u32) -> NodeIndex { + NodeIndex::new(index as usize) +} + struct EdgeInfo { topic_name: Cow<'static, str>, } @@ -489,6 +543,7 @@ impl Dags { let dag = Arc::new(Dag { id, graph: Mutex::new(graph::Graph::new()), + absolute_deadline: Mutex::new(None), #[cfg(feature = "perf")] response_info: Mutex::new(ResponseInfo::new()), @@ -525,6 +580,21 @@ pub fn get_dag(id: u32) -> Option> { dags.id_to_dag.get(&id).cloned() } +#[inline(always)] +pub fn get_dag_absolute_deadline(dag_id: u32) -> Option { + get_dag(dag_id)?.get_absolute_deadline() +} + +#[inline(always)] +pub fn set_dag_absolute_deadline(dag_id: u32, deadline: u64) -> bool { + if let Some(dag) = get_dag(dag_id) { + dag.set_absolute_deadline(deadline); + true + } else { + false + } +} + pub async fn finish_create_dags(dags: &[Arc]) -> Result<(), Vec> { match validate_all_rules(dags) { Ok(()) => { @@ -833,6 +903,7 @@ async fn spawn_reactor( subscribe_topic_names: Vec>, publish_topic_names: Vec>, sched_type: SchedulerType, + dag_info: DagInfo, ) -> u32 where F: Fn( @@ -862,7 +933,7 @@ where } }; - crate::task::spawn(reactor_name, future, sched_type) + crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info) } async fn spawn_periodic_reactor( @@ -872,6 +943,7 @@ async fn spawn_periodic_reactor( sched_type: SchedulerType, period: Duration, _release_measure: Option, + dag_info: DagInfo, ) -> u32 where F: Fn() -> ::Item + Send + 'static, @@ -909,7 +981,7 @@ where } }; - let task_id = crate::task::spawn(reactor_name, future, sched_type); + let task_id = crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info); #[cfg(feature = "perf")] release_measure(); @@ -922,6 +994,7 @@ async fn spawn_sink_reactor( f: F, subscribe_topic_names: Vec>, sched_type: SchedulerType, + dag_info: DagInfo, ) -> u32 where F: Fn(::Item) + Send + 'static, @@ -933,11 +1006,10 @@ where Args::create_subscribers(subscribe_topic_names, Attribute::default()); loop { - let args: <::Subscribers as MultipleReceiver>::Item = - subscribers.recv_all().await; + let args: ::Item = subscribers.recv_all().await; f(args); } }; - crate::task::spawn(reactor_name, future, sched_type) + crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info) } diff --git a/awkernel_async_lib/src/scheduler/gedf.rs b/awkernel_async_lib/src/scheduler/gedf.rs index 01e5d7d82..82bacdde2 100644 --- a/awkernel_async_lib/src/scheduler/gedf.rs +++ b/awkernel_async_lib/src/scheduler/gedf.rs @@ -4,9 +4,10 @@ use core::cmp::max; use super::{Scheduler, SchedulerType, Task}; use crate::{ + dag::{get_dag, get_dag_absolute_deadline, set_dag_absolute_deadline, to_node_index}, scheduler::{get_priority, peek_preemption_pending, push_preemption_pending}, task::{ - get_task, get_tasks_running, set_current_task, set_need_preemption, State, + get_task, get_tasks_running, set_current_task, set_need_preemption, DagInfo, State, MAX_TASK_PRIORITY, }, }; @@ -71,10 +72,18 @@ impl Scheduler for GEDFScheduler { let (wake_time, absolute_deadline) = { let mut node_inner = MCSNode::new(); let mut info = task.info.lock(&mut node_inner); + let dag_info = info.get_dag_info(); match info.scheduler_type { SchedulerType::GEDF(relative_deadline) => { let wake_time = awkernel_lib::delay::uptime(); - let absolute_deadline = wake_time + relative_deadline; + let absolute_deadline = if let Some(ref dag_info) = dag_info { + calculate_and_update_dag_deadline(dag_info, wake_time) + } else { + // If dag_info is not present, the task is treated as a regular task, and + // the absolute_deadline is calculated using the scheduler's relative_deadline. + wake_time + relative_deadline + }; + task.priority .update_priority_info(self.priority, MAX_TASK_PRIORITY - absolute_deadline); info.update_absolute_deadline(absolute_deadline); @@ -184,3 +193,35 @@ impl GEDFScheduler { false } } + +fn get_dag_sink_relative_deadline_ms(dag_id: u32) -> u64 { + let dag = get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found")); + dag.get_sink_relative_deadline() + .map(|deadline| deadline.as_millis() as u64) + .unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} has no sink relative deadline set")) +} + +fn calculate_and_set_dag_deadline(dag_id: u32, wake_time: u64) -> u64 { + let relative_deadline_ms = get_dag_sink_relative_deadline_ms(dag_id); + let dag_absolute_deadline = wake_time + relative_deadline_ms; + set_dag_absolute_deadline(dag_id, dag_absolute_deadline); + dag_absolute_deadline +} + +pub fn calculate_and_update_dag_deadline(dag_info: &DagInfo, wake_time: u64) -> u64 { + let dag_id = dag_info.dag_id; + let node_id = dag_info.node_id; + + if let Some(absolute_deadline) = get_dag_absolute_deadline(dag_id) { + let dag = + get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found")); + let current_node_index = to_node_index(node_id); + if !dag.is_source_node(current_node_index) { + return absolute_deadline; + } + + return calculate_and_set_dag_deadline(dag_id, wake_time); + } + + calculate_and_set_dag_deadline(dag_id, wake_time) +} diff --git a/awkernel_async_lib/src/task.rs b/awkernel_async_lib/src/task.rs index e25fb110e..6e4ee4636 100644 --- a/awkernel_async_lib/src/task.rs +++ b/awkernel_async_lib/src/task.rs @@ -157,6 +157,7 @@ pub struct TaskInfo { need_sched: bool, pub(crate) need_preemption: bool, panicked: bool, + pub(crate) dag_info: Option, #[cfg(not(feature = "no_preempt"))] thread: Option, @@ -219,6 +220,11 @@ impl TaskInfo { pub fn panicked(&self) -> bool { self.panicked } + + #[inline(always)] + pub fn get_dag_info(&self) -> Option { + self.dag_info.clone() + } } /// State of task. @@ -240,6 +246,12 @@ struct Tasks { id_to_task: BTreeMap>, } +#[derive(Clone)] +pub struct DagInfo { + pub dag_id: u32, + pub node_id: u32, +} + impl Tasks { const fn new() -> Self { Self { @@ -254,6 +266,7 @@ impl Tasks { future: Fuse>, scheduler: &'static dyn Scheduler, scheduler_type: SchedulerType, + dag_info: Option, ) -> u32 { let mut id = self.candidate_id; loop { @@ -272,6 +285,7 @@ impl Tasks { need_sched: false, need_preemption: false, panicked: false, + dag_info, #[cfg(not(feature = "no_preempt"))] thread: None, @@ -337,6 +351,44 @@ pub fn spawn( name: Cow<'static, str>, future: impl Future + 'static + Send, sched_type: SchedulerType, +) -> u32 { + inner_spawn(name, future, sched_type, None) +} + +/// Spawn a detached task with DAG information. +/// This function is similar to `spawn` but automatically sets DAG information +/// for the task, which is useful for DAG-based schedulers like GEDF. +/// +/// # Example +/// +/// ```ignore +/// use awkernel_async_lib::{scheduler::SchedulerType, task, dag::{create_dag, add_node_with_topic_edges_public, set_relative_deadline_public}}; +/// use core::time::Duration; +/// let dag = create_dag(); +/// let sink_node_idx = add_node_with_topic_edges_public(&dag, &[], &[]); +/// let deadline = Duration::from_millis(100); +/// set_relative_deadline_public(&dag, sink_node_idx, deadline); +/// let task_id = task::spawn_with_dag_info( +/// "dag task".into(), +/// async { Ok(()) }, +/// SchedulerType::GEDF(0), +/// DagInfo { dag_id: 1, node_id: 0 } +/// ); +/// ``` +pub fn spawn_with_dag_info( + name: Cow<'static, str>, + future: impl Future + 'static + Send, + sched_type: SchedulerType, + dag_info: DagInfo, +) -> u32 { + inner_spawn(name, future, sched_type, Some(dag_info)) +} + +pub fn inner_spawn( + name: Cow<'static, str>, + future: impl Future + 'static + Send, + sched_type: SchedulerType, + dag_info: Option, ) -> u32 { if let SchedulerType::PrioritizedFIFO(p) | SchedulerType::PrioritizedRR(p) = sched_type { if p > HIGHEST_PRIORITY { @@ -352,7 +404,7 @@ pub fn spawn( let mut node = MCSNode::new(); let mut tasks = TASKS.lock(&mut node); - let id = tasks.spawn(name, future.fuse(), scheduler, sched_type); + let id = tasks.spawn(name, future.fuse(), scheduler, sched_type, dag_info); let task = tasks.id_to_task.get(&id).cloned(); drop(tasks);