Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 87 additions & 91 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@ impl Drone for RustasticDrone {
}
}

/// Runs the main loop of the drone, continuously processing commands and packets.
///
/// This function enters an infinite loop, constantly monitoring two channels:
/// - `self.controller_recv`: Receives commands from the drone controller.
/// - `self.packet_recv`: Receives raw data packets.
///
/// The loop uses `select_biased!` to efficiently handle incoming data from both channels.
///
/// Command from simulation controller are prioritized over data packets
///
/// **Behavior:**
/// - If a `DroneCommand::Crash` is received, the loop terminates and a warning message is logged.
/// - Other commands are passed to the `handle_command` function for further processing.
/// - Received packets are passed to the `handle_packet` function for handling.
/// Runs the main loop of the drone, continuously processing commands and packets.
///
/// This function enters an infinite loop, constantly monitoring two channels:
/// - `self.controller_recv`: Receives commands from the drone controller.
/// - `self.packet_recv`: Receives raw data packets.
///
/// The loop uses `select_biased!` to efficiently handle incoming data from both channels.
///
/// Command from simulation controller are prioritized over data packets
///
/// # Behavior:
/// - If a `DroneCommand::Crash` is received, the loop terminates and a warning message is logged.
/// - Other commands are passed to the `handle_command` function for further processing.
/// - Received packets are passed to the `handle_packet` function for handling.
fn run(&mut self) {
loop {
select_biased! {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl RustasticDrone {
/// can be forwarded to the correct next hop. If any errors are found, appropriate `Nack` packets are sent.
///
/// # Packet Handling Logic
/// - **`FloodRequest`**: If the packet is a flood request, it handles the request by calling `handle_flood_request`,
/// - **`FloodRequest`**: If the packet is a flood request, it handles the request by calling `handle_flood_request`,
/// and then adds the flood ID to the `flood_id_received` set to prevent duplicate processing of the same flood.
/// - **Correct Packet ID**: If the packet has the correct ID and is routable, it continues with routing and hop management.
/// - **Destination Check**: If the destination of the packet is not a valid destination (e.g., a drone instead of a client/server),
Expand Down Expand Up @@ -172,7 +172,7 @@ impl RustasticDrone {
if let PacketType::FloodRequest(flood_request) = packet.clone().pack_type {
let flood_id = flood_request.flood_id;
let flood_initiator = flood_request.initiator_id;
self.handle_flood_request(flood_request, packet);
self.handle_flood_request(flood_request, &packet);
self.flood_id_received.insert((flood_id, flood_initiator));
} else if self.check_packet_correct_id(packet.clone()) {
// Increase hop_index
Expand All @@ -185,12 +185,12 @@ impl RustasticDrone {
"✗".red(),
self.id
);
self.send_nack(packet.clone(), None, NackType::DestinationIsDrone);
self.send_nack(packet, None, NackType::DestinationIsDrone);
return;
}

// Check if the next hop is a valid neighbor
if !self.check_neighbor(packet.clone()) {
if !self.check_neighbor(&packet) {
//Step4
let neighbor = packet.routing_header.hops[packet.routing_header.hop_index];
error!(
Expand All @@ -199,7 +199,7 @@ impl RustasticDrone {
self.id,
neighbor
);
self.send_nack(packet.clone(), None, NackType::ErrorInRouting(self.id));
self.send_nack(packet, None, NackType::ErrorInRouting(self.id));
return;
}

Expand All @@ -217,7 +217,7 @@ impl RustasticDrone {
PacketType::MsgFragment(fragment) => self.handle_fragment(packet, fragment),
PacketType::FloodRequest(_) => unreachable!(),
PacketType::FloodResponse(flood_response) => {
self.handle_flood_response(flood_response, packet);
self.handle_flood_response(&flood_response, &packet);
}
}
}
Expand Down Expand Up @@ -262,7 +262,7 @@ impl RustasticDrone {
// Try sending to the destination drone
if let Some(sender) = self.packet_send.get(&destination) {
match sender.send(packet.clone()) {
Ok(_) => {
Ok(()) => {
info!(
"{} [ Drone {} ]: was sent a {} packet to [ Drone {} ]",
"✓".green(),
Expand Down Expand Up @@ -367,7 +367,7 @@ impl RustasticDrone {
/// }
/// ```
fn check_packet_correct_id(&self, packet: Packet) -> bool {
if self.id == packet.clone().routing_header.hops[packet.clone().routing_header.hop_index] {
if self.id == packet.routing_header.hops[packet.routing_header.hop_index] {
true
} else {
self.send_nack(packet, None, NackType::UnexpectedRecipient(self.id));
Expand Down Expand Up @@ -410,65 +410,62 @@ impl RustasticDrone {
fn handle_ack_nack(&mut self, mut packet: Packet) {
if packet.routing_header.hop_index >= packet.routing_header.hops.len() {
// It can't happen
panic!("{} Source is not a client!", "PANIC".purple());
unreachable!("{} Source is not a client!", "PANIC".purple());
}
match packet.clone().pack_type {
PacketType::Nack(nack) => {
warn!(
"{} [ Drone {} ]: received a {}",
"!!!".yellow(),
self.id,
packet.pack_type,
);
if let PacketType::Nack(nack) = packet.clone().pack_type {
warn!(
"{} [ Drone {} ]: received a {}",
"!!!".yellow(),
self.id,
packet.pack_type,
);

warn!(
"\n├─>{} Checking [ Drone {} ] buffer...",
"!!!".yellow(),
self.id
);
warn!(
"\n├─>{} Checking [ Drone {} ] buffer...",
"!!!".yellow(),
self.id
);

// Check if the fragment is in the buffer
if let Some(fragment) = self
.buffer
.get_fragment(packet.clone().session_id, nack.fragment_index)
{
info!(
"├─>{} Fragment [ fragment_index: {} ] of the Packet [ session_id: {} ] was found in the buffer",
"✓".green(),
fragment.fragment_index,
packet.session_id
);
// Check if the fragment is in the buffer
if let Some(fragment) = self
.buffer
.get_fragment(packet.clone().session_id, nack.fragment_index)
{
info!(
"├─>{} Fragment [ fragment_index: {} ] of the Packet [ session_id: {} ] was found in the buffer",
"✓".green(),
fragment.fragment_index,
packet.session_id
);

// Resend the fragment, reverse the path
packet.routing_header.reverse();
let new_packet = Packet {
pack_type: PacketType::MsgFragment(fragment.clone()),
routing_header: packet.routing_header.clone(),
session_id: packet.session_id,
};
self.send_message(new_packet);
// Resend the fragment, reverse the path
packet.routing_header.reverse();
let new_packet = Packet {
pack_type: PacketType::MsgFragment(fragment.clone()),
routing_header: packet.routing_header.clone(),
session_id: packet.session_id,
};
self.send_message(new_packet);

info!("└─>{} The Packet was sent", "✓".green());
} else {
// Send a nack to the previous node
packet.routing_header.hop_index += 1; // Move to the previous hop
self.send_nack(packet, None, NackType::Dropped);
}
info!("└─>{} The Packet was sent", "✓".green());
} else {
// Send a nack to the previous node
packet.routing_header.hop_index += 1; // Move to the previous hop
self.send_nack(packet, None, NackType::Dropped);
}
_ => {
if packet.routing_header.hop_index < packet.routing_header.hops.len() - 1 {
packet.routing_header.hop_index += 1; // Move to the next hop
} else {
error!(
"{} Invalid hop index increment detected in [ Drone: {} ] for header of Packet [ session_id: {} ]",
"✗".red(),
self.id,
packet.session_id
);
return;
}
self.send_message(packet);
} else {
if packet.routing_header.hop_index < packet.routing_header.hops.len() - 1 {
packet.routing_header.hop_index += 1; // Move to the next hop
} else {
error!(
"{} Invalid hop index increment detected in [ Drone: {} ] for header of Packet [ session_id: {} ]",
"✗".red(),
self.id,
packet.session_id
);
return;
}
self.send_message(packet);
}
}

Expand Down Expand Up @@ -580,7 +577,7 @@ impl RustasticDrone {

// Send the NACK to the previous hop
match sender.send(packet.clone()) {
Ok(_) => {
Ok(()) => {
warn!(
"{} Nack was sent from [ Drone {} ] to [ Drone {} ]",
"!!!".yellow(),
Expand Down Expand Up @@ -674,12 +671,11 @@ impl RustasticDrone {
/// println!("The destination is not a neighbor.");
/// }
/// ```
fn check_neighbor(&self, packet: Packet) -> bool {
fn check_neighbor(&self, packet: &Packet) -> bool {
let destination = packet.routing_header.hops[packet.routing_header.hop_index];
self.packet_send.contains_key(&destination)
}

#[allow(clippy::cast_possible_truncation)]
// Determines if a packet fragment should be dropped based on the Packet Drop Rate (PDR).
///
/// This function simulates the packet drop behavior based on the current drone's Packet Drop Rate (PDR).
Expand All @@ -701,8 +697,8 @@ impl RustasticDrone {
/// ```
fn check_drop_fragment(&self) -> bool {
let mut rng = rand::thread_rng();
let val = rng.gen_range(1..=100);
val <= (self.pdr * 100f32) as i32
let val = rng.gen_range(1f32..=100f32);
val <= self.pdr * 100f32
}

/// Handles an incoming `FloodRequest` packet and processes it accordingly.
Expand Down Expand Up @@ -731,13 +727,13 @@ impl RustasticDrone {
/// ```rust
/// drone.handle_flood_request(flood_request, packet);
/// ```
fn handle_flood_request(&self, mut flood_request: FloodRequest, packet: Packet) {
fn handle_flood_request(&self, mut flood_request: FloodRequest, packet: &Packet) {
// Determine the previous node that sent the packet
let prev_node = if let Some(node) = flood_request.path_trace.last() {
node.0
} else {
error!("A drone can't be the first node in the path-trace.");
todo!("how to tell controller we received a wrong path-trace")
return;
};

// Add the current drone to the path-trace
Expand Down Expand Up @@ -856,7 +852,7 @@ impl RustasticDrone {
/// ```rust
/// drone.handle_flood_response(flood_response, packet);
/// ```
fn handle_flood_response(&self, flood_response: FloodResponse, packet: Packet) {
fn handle_flood_response(&self, flood_response: &FloodResponse, packet: &Packet) {
let new_routing_header = packet.routing_header.clone();

// Prepare a new packet to send the flood response back
Expand Down Expand Up @@ -1105,7 +1101,7 @@ impl RustasticDrone {
///
/// # Panic:
/// - Panic if a command of type `DroneCommand::Crash` is passed
///
///
/// # Example:
/// ```rust
/// drone.handle_command(DroneCommand::AddSender(node_id, sender));
Expand Down Expand Up @@ -1151,21 +1147,21 @@ impl RustasticDrone {
}
}
DroneCommand::RemoveSender(node_id) => {
if !self.packet_send.contains_key(&node_id) {
warn!(
"{} [ Drone {} ] is already disconnected from [ Drone {} ]",
"!!!".yellow(),
self.id,
node_id
);
} else {
if self.packet_send.contains_key(&node_id) {
info!(
"{} Removing sender: {} from [ Drone {} ]",
"✓".green(),
node_id,
self.id
);
self.packet_send.remove(&node_id);
} else {
warn!(
"{} [ Drone {} ] is already disconnected from [ Drone {} ]",
"!!!".yellow(),
self.id,
node_id
);
}
}
DroneCommand::Crash => unreachable!(),
Expand Down
Loading