Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6dffb4b
test: for develop #1
nokosaaan Aug 18, 2025
00518ae
test: for develop #2
nokosaaan Aug 18, 2025
3f81a8f
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Aug 18, 2025
dc772c5
test: whether source_nodes or others & modify DAG struct #3
nokosaaan Aug 18, 2025
5b645fd
test: add else process for initial period #4
nokosaaan Aug 18, 2025
4be6763
test: absolute deadline setting #5
nokosaaan Aug 21, 2025
555ead5
fix: absolute deadline setting #6
nokosaaan Aug 21, 2025
ec2e69e
fix: Resolve merge conflicts in task.rs and gedf.rs
nokosaaan Aug 21, 2025
b6cd056
fix absolute deadline #7
nokosaaan Aug 21, 2025
85d79f8
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Aug 21, 2025
785c02f
test: change gedf.rs #8
nokosaaan Aug 22, 2025
68ea963
test: change gedf.rs #9
nokosaaan Aug 22, 2025
85ba2f8
fix: delete unnecessary code
nokosaaan Aug 27, 2025
d416b90
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Aug 27, 2025
4a57f81
fix: modify get_node_info and etc
nokosaaan Aug 27, 2025
f0fef01
fix: change function name
nokosaaan Aug 27, 2025
4f27a2e
fix: organizing comments & modify else handling
nokosaaan Aug 29, 2025
5376f88
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Aug 29, 2025
339f669
fix: remove unused methods
nokosaaan Aug 29, 2025
5e47291
fix: change PRIORITY_LIST & change dag.rs
nokosaaan Aug 29, 2025
e7f2120
fix: change gedf.rs & task.rs
nokosaaan Aug 29, 2025
7c65150
fix: avoid deadlock for GEDFNoArg
nokosaaan Aug 30, 2025
fdb6f86
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Sep 3, 2025
a840b4c
fix: change spawn logic
nokosaaan Sep 4, 2025
7d2b6f5
fix: delete GEDFNoArg
nokosaaan Sep 6, 2025
6ef5a8e
fix: check cargo fmt
nokosaaan Sep 6, 2025
fef8c11
fix: adjusting the number of arguments
nokosaaan Sep 6, 2025
f4f50aa
fix: check cargo fmt 2
nokosaaan Sep 6, 2025
506a019
fix: improper variable usage in panic! macros
nokosaaan Sep 6, 2025
4282cbe
fix: check doctest
nokosaaan Sep 6, 2025
f1b3082
fix: check doctest 2
nokosaaan Sep 6, 2025
0c2a69d
fix: check doctest 3
nokosaaan Sep 6, 2025
a8b4d72
fix: check doctest 4
nokosaaan Sep 6, 2025
ef3f003
fix: check doctest 5
nokosaaan Sep 6, 2025
d8dc0ed
fix: check doctest 6
nokosaaan Sep 6, 2025
3d45ffa
fix: check doctest 7
nokosaaan Sep 6, 2025
038c4f9
fix: check doctest 8
nokosaaan Sep 6, 2025
3bbb845
fix: check cargo fmt 3
nokosaaan Sep 6, 2025
2016929
fix: check doctest 9
nokosaaan Sep 6, 2025
daf56e9
fix: check doctest 10
nokosaaan Sep 6, 2025
7dd440d
fix: check doctest 11
nokosaaan Sep 6, 2025
7ed6053
fix: fix unnecessary imports
nokosaaan Sep 6, 2025
7f347d0
fix: reset the LOG_ENABLE flag to its original state
nokosaaan Sep 6, 2025
d164129
fix: delete unnecessary functions
nokosaaan Sep 6, 2025
c45c465
fix: delete unnecessary functions 2
nokosaaan Sep 6, 2025
1839e8f
fix: delete unnecessary functions 3
nokosaaan Sep 6, 2025
a513772
fix: implement gedf
nokosaaan Sep 17, 2025
d12a0cf
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Sep 17, 2025
25a8f8e
fix: check cargo fmt 4
nokosaaan Sep 17, 2025
a5a61a9
fix: refactored the nested sections using helper functions
nokosaaan Sep 18, 2025
46b7c0a
fix: reduced nested structure using early return
nokosaaan Sep 18, 2025
af5b3bc
Merge branch 'main' of github.com:tier4/awkernel into dag_gedf
nokosaaan Sep 25, 2025
d8d48f7
fix: adding a function and code optimization
nokosaaan Sep 25, 2025
8cc8091
fix: check cargo fmt 5
nokosaaan Sep 25, 2025
02efdf2
fix: add a comment in dag.rs
nokosaaan Sep 25, 2025
dc1771e
fix: format! error
nokosaaan Sep 25, 2025
7d654bb
fix: To resolve the build error, I used unwrap_or_else instead of exp…
nokosaaan Sep 25, 2025
5334ba7
fix: delete unnecessary import
nokosaaan Sep 25, 2025
9ef5a1b
fix: delete semicolon
nokosaaan Sep 25, 2025
4c1d629
fix: panic! 2
nokosaaan Sep 25, 2025
3e20931
fix: unify the name index to id
nokosaaan Sep 25, 2025
1d650ea
fix: consolidated the duplicate processing and add a comment
nokosaaan Sep 25, 2025
76038e7
fix: solve let binding
nokosaaan Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions applications/tests/test_dag/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use awkernel_async_lib::scheduler::SchedulerType;
use awkernel_lib::delay::wait_microsec;
use core::time::Duration;

const LOG_ENABLE: bool = false;
const LOG_ENABLE: bool = true;

pub async fn run() {
wait_microsec(1000000);
Expand All @@ -30,7 +30,7 @@ pub async fn run() {
(number,)
},
vec![Cow::from("topic0")],
SchedulerType::PrioritizedFIFO(30),
SchedulerType::GEDF(5),
period,
)
.await;
Expand All @@ -47,7 +47,7 @@ pub async fn run() {
},
vec![Cow::from("topic0")],
vec![Cow::from("topic1"), Cow::from("topic2")],
SchedulerType::PrioritizedFIFO(0),
SchedulerType::GEDF(10),
)
.await;

Expand All @@ -62,7 +62,7 @@ pub async fn run() {
},
vec![Cow::from("topic1")],
vec![Cow::from("topic3")],
SchedulerType::PrioritizedFIFO(0),
SchedulerType::GEDF(10),
)
.await;

Expand All @@ -77,7 +77,7 @@ pub async fn run() {
},
vec![Cow::from("topic2")],
vec![Cow::from("topic4")],
SchedulerType::PrioritizedFIFO(0),
SchedulerType::GEDF(10),
)
.await;

Expand All @@ -90,7 +90,7 @@ pub async fn run() {
}
},
vec![Cow::from("topic3"), Cow::from("topic4")],
SchedulerType::PrioritizedFIFO(0),
SchedulerType::GEDF(10),
Duration::from_secs(1),
)
.await;
Expand Down
82 changes: 77 additions & 5 deletions awkernel_async_lib/src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::{
visit::{EdgeRef, IntoNodeReferences, NodeRef},
},
scheduler::SchedulerType,
task::DagInfo,
time_interval::interval,
Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers,
};
Expand Down Expand Up @@ -168,6 +169,7 @@ impl core::fmt::Display for DagError {
pub struct Dag {
id: u32,
graph: Mutex<graph::Graph<NodeInfo, EdgeInfo>>,
absolute_deadline: Mutex<Option<u64>>,

#[cfg(feature = "perf")]
response_info: Mutex<ResponseInfo>,
Expand All @@ -190,6 +192,25 @@ impl Dag {
graph.externals(Direction::Outgoing).collect()
}

pub fn is_source_node(&self, node_index: NodeIndex) -> bool {
let source_nodes = self.get_source_nodes();
source_nodes.contains(&node_index)
}

// Returns the relative deadline of the first sink node, if it exists.
pub fn get_sink_relative_deadline(&self) -> Option<Duration> {
let sink_nodes: Vec<NodeIndex> = self.get_sink_nodes();

if let Some(sink_node_index) = sink_nodes.first() {
self.graph
.lock(&mut MCSNode::new())
.node_weight(*sink_node_index)?
.relative_deadline
} else {
None
}
}

fn set_relative_deadline(&self, node_idx: NodeIndex, deadline: Duration) {
let mut node = MCSNode::new();
let mut graph = self.graph.lock(&mut node);
Expand Down Expand Up @@ -284,6 +305,10 @@ impl Dag {
self.check_subscribe_mismatch::<Args>(&subscribe_topic_names, node_idx);
self.check_publish_mismatch::<Ret>(&publish_topic_names, node_idx);

// To prevent errors caused by ownership moves
let dag_id = self.id;
let node_id = node_idx.index() as u32;

let mut node = MCSNode::new();
let mut pending_tasks = PENDING_TASKS.lock(&mut node);
pending_tasks
Expand All @@ -297,6 +322,7 @@ impl Dag {
subscribe_topic_names,
publish_topic_names,
sched_type,
DagInfo { dag_id, node_id },
)
.await
})
Expand Down Expand Up @@ -330,6 +356,10 @@ impl Dag {
}
};

// To prevent errors caused by ownership moves
let dag_id = self.id;
let node_id = node_idx.index() as u32;

let mut node = MCSNode::new();
let mut source_pending_tasks = SOURCE_PENDING_TASKS.lock(&mut node);
source_pending_tasks.insert(
Expand All @@ -343,6 +373,7 @@ impl Dag {
sched_type,
period,
measure_f,
DagInfo { dag_id, node_id },
)
.await
})
Expand Down Expand Up @@ -384,6 +415,10 @@ impl Dag {
}
};

// To prevent errors caused by ownership moves
let dag_id = self.id;
let node_id = node_idx.index() as u32;

let mut node = MCSNode::new();
let mut pending_tasks = PENDING_TASKS.lock(&mut node);
pending_tasks
Expand All @@ -396,6 +431,7 @@ impl Dag {
measure_f,
subscribe_topic_names,
sched_type,
DagInfo { dag_id, node_id },
)
.await
})
Expand Down Expand Up @@ -437,6 +473,20 @@ impl Dag {
.push(node_idx.index());
}
}

#[inline(always)]
pub fn set_absolute_deadline(&self, deadline: u64) {
let mut node = MCSNode::new();
let mut absolute_deadline = self.absolute_deadline.lock(&mut node);
*absolute_deadline = Some(deadline);
}

#[inline(always)]
pub fn get_absolute_deadline(&self) -> Option<u64> {
let mut node = MCSNode::new();
let absolute_deadline = self.absolute_deadline.lock(&mut node);
*absolute_deadline
}
}

struct PendingTask {
Expand All @@ -463,6 +513,10 @@ struct NodeInfo {
relative_deadline: Option<Duration>,
}

pub fn to_node_index(index: u32) -> NodeIndex {
NodeIndex::new(index as usize)
}

struct EdgeInfo {
topic_name: Cow<'static, str>,
}
Expand All @@ -489,6 +543,7 @@ impl Dags {
let dag = Arc::new(Dag {
id,
graph: Mutex::new(graph::Graph::new()),
absolute_deadline: Mutex::new(None),

#[cfg(feature = "perf")]
response_info: Mutex::new(ResponseInfo::new()),
Expand Down Expand Up @@ -525,6 +580,21 @@ pub fn get_dag(id: u32) -> Option<Arc<Dag>> {
dags.id_to_dag.get(&id).cloned()
}

#[inline(always)]
pub fn get_dag_absolute_deadline(dag_id: u32) -> Option<u64> {
get_dag(dag_id)?.get_absolute_deadline()
}

#[inline(always)]
pub fn set_dag_absolute_deadline(dag_id: u32, deadline: u64) -> bool {
if let Some(dag) = get_dag(dag_id) {
dag.set_absolute_deadline(deadline);
true
} else {
false
}
}

pub async fn finish_create_dags(dags: &[Arc<Dag>]) -> Result<(), Vec<DagError>> {
match validate_all_rules(dags) {
Ok(()) => {
Expand Down Expand Up @@ -833,6 +903,7 @@ async fn spawn_reactor<F, Args, Ret>(
subscribe_topic_names: Vec<Cow<'static, str>>,
publish_topic_names: Vec<Cow<'static, str>>,
sched_type: SchedulerType,
dag_info: DagInfo,
) -> u32
where
F: Fn(
Expand Down Expand Up @@ -862,7 +933,7 @@ where
}
};

crate::task::spawn(reactor_name, future, sched_type)
crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info)
}

async fn spawn_periodic_reactor<F, Ret>(
Expand All @@ -872,6 +943,7 @@ async fn spawn_periodic_reactor<F, Ret>(
sched_type: SchedulerType,
period: Duration,
_release_measure: Option<MeasureF>,
dag_info: DagInfo,
) -> u32
where
F: Fn() -> <Ret::Publishers as MultipleSender>::Item + Send + 'static,
Expand Down Expand Up @@ -909,7 +981,7 @@ where
}
};

let task_id = crate::task::spawn(reactor_name, future, sched_type);
let task_id = crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info);

#[cfg(feature = "perf")]
release_measure();
Expand All @@ -922,6 +994,7 @@ async fn spawn_sink_reactor<F, Args>(
f: F,
subscribe_topic_names: Vec<Cow<'static, str>>,
sched_type: SchedulerType,
dag_info: DagInfo,
) -> u32
where
F: Fn(<Args::Subscribers as MultipleReceiver>::Item) + Send + 'static,
Expand All @@ -933,11 +1006,10 @@ where
Args::create_subscribers(subscribe_topic_names, Attribute::default());

loop {
let args: <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item =
subscribers.recv_all().await;
let args: <Args::Subscribers as MultipleReceiver>::Item = subscribers.recv_all().await;
f(args);
}
};

crate::task::spawn(reactor_name, future, sched_type)
crate::task::spawn_with_dag_info(reactor_name, future, sched_type, dag_info)
}
45 changes: 43 additions & 2 deletions awkernel_async_lib/src/scheduler/gedf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use core::cmp::max;

use super::{Scheduler, SchedulerType, Task};
use crate::{
dag::{get_dag, get_dag_absolute_deadline, set_dag_absolute_deadline, to_node_index},
scheduler::{get_priority, peek_preemption_pending, push_preemption_pending},
task::{
get_task, get_tasks_running, set_current_task, set_need_preemption, State,
get_task, get_tasks_running, set_current_task, set_need_preemption, DagInfo, State,
MAX_TASK_PRIORITY,
},
};
Expand Down Expand Up @@ -71,10 +72,18 @@ impl Scheduler for GEDFScheduler {
let (wake_time, absolute_deadline) = {
let mut node_inner = MCSNode::new();
let mut info = task.info.lock(&mut node_inner);
let dag_info = info.get_dag_info();
match info.scheduler_type {
SchedulerType::GEDF(relative_deadline) => {
let wake_time = awkernel_lib::delay::uptime();
let absolute_deadline = wake_time + relative_deadline;
let absolute_deadline = if let Some(ref dag_info) = dag_info {
calculate_and_update_dag_deadline(dag_info, wake_time)
} else {
// If dag_info is not present, the task is treated as a regular task, and
// the absolute_deadline is calculated using the scheduler's relative_deadline.
wake_time + relative_deadline
};

task.priority
.update_priority_info(self.priority, MAX_TASK_PRIORITY - absolute_deadline);
info.update_absolute_deadline(absolute_deadline);
Expand Down Expand Up @@ -184,3 +193,35 @@ impl GEDFScheduler {
false
}
}

fn get_dag_sink_relative_deadline_ms(dag_id: u32) -> u64 {
let dag = get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found"));
dag.get_sink_relative_deadline()
.map(|deadline| deadline.as_millis() as u64)
.unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} has no sink relative deadline set"))
}

fn calculate_and_set_dag_deadline(dag_id: u32, wake_time: u64) -> u64 {
let relative_deadline_ms = get_dag_sink_relative_deadline_ms(dag_id);
let dag_absolute_deadline = wake_time + relative_deadline_ms;
set_dag_absolute_deadline(dag_id, dag_absolute_deadline);
dag_absolute_deadline
}

pub fn calculate_and_update_dag_deadline(dag_info: &DagInfo, wake_time: u64) -> u64 {
let dag_id = dag_info.dag_id;
let node_id = dag_info.node_id;

if let Some(absolute_deadline) = get_dag_absolute_deadline(dag_id) {
let dag =
get_dag(dag_id).unwrap_or_else(|| panic!("GEDF scheduler: DAG {dag_id} not found"));
let current_node_index = to_node_index(node_id);
if !dag.is_source_node(current_node_index) {
return absolute_deadline;
}

return calculate_and_set_dag_deadline(dag_id, wake_time);
}

calculate_and_set_dag_deadline(dag_id, wake_time)
}
Loading