Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
wg_2024 = { git = "https://github.com/WGL-2024/WGL_repo_2024.git", features = ["serialize", "debug"] }
messages = { git = "https://github.com/Rustastic/Messages.git"}
assembler = { git = "https://github.com/Rustastic/Assembler.git"}
source_routing = { git = "https://github.com/Rustastic/source_routing.git"}
source_routing = { git = "https://github.com/Rustastic/source_routing.git", branch = "network_caching"}
crossbeam-channel = "0.5.13"
toml = "0.8.19"
rand = "0.8.0"
Expand Down
28 changes: 0 additions & 28 deletions src/chat_client/handle_command/flooding.rs

This file was deleted.

16 changes: 14 additions & 2 deletions src/chat_client/handle_command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use messages::{
client_commands::{ChatClientCommand, ChatClientEvent},
high_level_messages::{ClientMessage, MessageContent},
};
use std::{thread, time::Duration};

use super::ChatClient;

mod flooding;
mod send_message;

impl ChatClient {
Expand All @@ -25,6 +25,7 @@ impl ChatClient {
self.id,
);
e.insert(sender);
self.router.add_neighbour(node_id);
} else {
warn!(
"{} [ ChatClient {} ] is already connected to [ Drone {} ]",
Expand All @@ -43,6 +44,7 @@ impl ChatClient {
self.id
);
self.packet_send.remove(&node_id);
self.router.remove_neighbour(node_id);
} else {
warn!(
"{} [ ChatClient {} ] is already disconnected from [ Drone {} ]",
Expand All @@ -58,7 +60,17 @@ impl ChatClient {
"ℹ".blue(),
self.id
);
self.start_flooding();
let requests = self.router.get_flood_requests(self.packet_send.len());
for (sender, request) in self.packet_send.values().zip(requests) {
if sender.send(request).is_err() {
error!(
"{} [ ChatClient {} ]: Failed to send floodrequest",
"✓".green(),
self.id
);
}
}
thread::sleep(Duration::from_secs(2));
}
ChatClientCommand::StartChatClient => {
self.running = true;
Expand Down
44 changes: 24 additions & 20 deletions src/chat_client/handle_packet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{thread, time::Duration};

use super::ChatClient;
use colored::Colorize;
use log::{error, info, warn};
Expand Down Expand Up @@ -391,14 +393,13 @@ impl ChatClient {

self.router.dropped_fragment(unreachable_node);

if let Some(incorrect_packet) =
self
.msgfactory
.take_packet(packet.session_id, nack.fragment_index)
if let Some(incorrect_packet) = self
.msgfactory
.take_packet(packet.session_id, nack.fragment_index)
{
let dest = incorrect_packet.routing_header.destination().unwrap();

let _ = self.router.drone_crashed(unreachable_node);
self.router.drone_crashed(unreachable_node);

if let Ok(new_routing_header) = self.router.get_source_routing_header(dest) {
let new_packet = Packet {
Expand All @@ -424,7 +425,6 @@ impl ChatClient {
self.id,
dest
);
self.reinit_network();
}
}
}
Expand All @@ -438,32 +438,37 @@ impl ChatClient {
);
}
NackType::Dropped => {


self.router.dropped_fragment(nack_src);

if let Some((dropped_packet, requests)) = self
.msgfactory
.get_packet(packet.session_id, nack.fragment_index)
{
if requests > 100 {
info!(
"{} [ ChatClient {} ]: Reinitializing network due to excessive dropped requests",
"ℹ".blue(),
self.id
);
let requests = self.router.get_flood_requests(self.packet_send.len());
for (sender, request) in self.packet_send.values().zip(requests) {
if sender.send(request).is_err() {
error!(
"{} [ ChatClient {} ]: Failed to send floodrequest",
"✓".green(),
self.id
);
}
}
thread::sleep(Duration::from_secs(2));
}
error!(
"{} [ ChatClient {} ]: Packet with session_id: {} and fragment_index: {} has been dropped",
"✗".red(),
self.id,
dropped_packet.session_id,
nack.fragment_index
);

if requests > 100 {
error!(
"{} [ ChatClient {} ]: Packet with session_id: {} and fragment_index: {} has been dropped more than 100 times. Dropping permanently.",
"✗".red(),
self.id,
dropped_packet.session_id,
nack.fragment_index
);
return;
}

let destination = dropped_packet.routing_header.destination().unwrap();

Expand Down Expand Up @@ -494,7 +499,6 @@ impl ChatClient {
self.id,
destination
);
self.reinit_network();
}
}
}
Expand Down
15 changes: 1 addition & 14 deletions src/chat_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use assembler::HighLevelMessageFactory;
use colored::Colorize;
use crossbeam_channel::{select_biased, Receiver, Sender};
use log::info;
use messages::{
client_commands::{ChatClientCommand, ChatClientEvent},
high_level_messages::Message,
};
use source_routing::Router;
use std::thread;
use std::{collections::HashMap, time::Duration};
use std::collections::HashMap;
use wg_2024::{
network::NodeId,
packet::{NodeType, Packet},
Expand Down Expand Up @@ -107,14 +104,4 @@ impl ChatClient {
}
}
}

fn reinit_network(&mut self) {
info!(
"{} [ ChatClient {} ]: reinitializing the network...",
"✓".green(),
self.id
);
self.start_flooding();
thread::sleep(Duration::from_secs(2));
}
}
Loading