Skip to content

Commit 617ec32

Browse files
authored
Fix network preemption (#45)
* Ensures connections are only established if there's space to store them. Also downgrades send_interval to u64 (u128 is not supported by serde) * cast elapsed to u64
1 parent d5dc0b3 commit 617ec32

File tree

4 files changed

+30
-26
lines changed

4 files changed

+30
-26
lines changed

scripts/network/workloads/network-client.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ departure_rate = 0.1
1212
connections_static = 100
1313
connections_dyn_max = 1000
1414
preempt = true
15+
conns_per_addr = 1

scripts/network/workloads/network-server.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ server = true
88
address = "223.42.0.1"
99
target_port = 1337
1010
connections_static = 100
11-
connections_dyn_max = 1000
12-
preempt = true
11+
connections_dyn_max = 100
12+
preempt = false
13+
conns_per_addr = 1

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub enum Workload {
131131
/// This parameter allows to control the overhead of sending data,
132132
/// so that it will not impact connections monitoring.
133133
#[serde(default = "default_network_send_interval")]
134-
send_interval: u128,
134+
send_interval: u64,
135135

136136
/// Whether or not to wait for a connection to be removed before adding
137137
/// a new one, when the dynamic connection limit is reached.
@@ -166,7 +166,7 @@ fn default_conns_per_addr() -> u16 {
166166
100
167167
}
168168

169-
fn default_network_send_interval() -> u128 {
169+
fn default_network_send_interval() -> u64 {
170170
100
171171
}
172172

src/worker/network.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -191,31 +191,32 @@ impl NetworkWorker {
191191
let (local_addr, local_port) =
192192
get_local_addr_port(addr, conns_per_addr, index);
193193

194-
socket
195-
.connect(
196-
iface.context(),
197-
(addr, target_port),
198-
(local_addr, local_port),
199-
)
200-
.unwrap();
201-
202-
let handle = sockets.add(socket);
203194
let lifetime: f64 =
204195
thread_rng().sample(Exp::new(departure_rate).unwrap());
205196

206197
// If we've reached the connections limit
207-
if dynamic_sockets.len() == connections_dyn_max as usize {
208-
if preempt {
209-
let idx = thread_rng()
210-
.gen_range(0..connections_dyn_max as usize);
211-
let (key, _) = sockets.iter().nth(idx).unwrap();
212-
dynamic_sockets.remove(&key);
213-
close_sockets.push(key);
214-
215-
dynamic_sockets
216-
.insert(handle, (SystemTime::now(), lifetime));
217-
}
218-
} else {
198+
if dynamic_sockets.len() == connections_dyn_max as usize
199+
&& preempt
200+
{
201+
let idx =
202+
thread_rng().gen_range(0..connections_dyn_max as usize);
203+
let (key, _) = sockets.iter().nth(idx).unwrap();
204+
dynamic_sockets.remove(&key);
205+
close_sockets.push(key);
206+
}
207+
208+
// either we've just removed a socket and want to preempt
209+
// or, we've have space and we're processing normally
210+
if dynamic_sockets.len() < connections_dyn_max as usize {
211+
socket
212+
.connect(
213+
iface.context(),
214+
(addr, target_port),
215+
(local_addr, local_port),
216+
)
217+
.unwrap();
218+
219+
let handle = sockets.add(socket);
219220
dynamic_sockets
220221
.insert(handle, (SystemTime::now(), lifetime));
221222
}
@@ -273,7 +274,8 @@ impl NetworkWorker {
273274
}
274275

275276
if socket.may_send() {
276-
let elapsed = send_timer.elapsed().unwrap().as_millis();
277+
let elapsed =
278+
send_timer.elapsed().unwrap().as_millis() as u64;
277279

278280
// Throttle sending data via connection, since the main
279281
// purpose is to excercise connection monitoring.

0 commit comments

Comments
 (0)