Skip to content

Commit 0095d3d

Browse files
committed
feat: support reverse l7 metrics
1 parent 28ec458 commit 0095d3d

27 files changed

+219
-75
lines changed

agent/src/collector/collector.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ impl Stash {
520520
0,
521521
0,
522522
L7Protocol::Unknown,
523+
false,
523524
self.context.agent_mode,
524525
);
525526
self.fill_edge_l4_stats(tagger, acc_flow.flow_meter);
@@ -554,6 +555,7 @@ impl Stash {
554555
0,
555556
0,
556557
L7Protocol::Unknown,
558+
false,
557559
self.context.agent_mode,
558560
);
559561
self.fill_single_l4_stats(tagger, flow_meter);
@@ -569,6 +571,7 @@ impl Stash {
569571
0,
570572
0,
571573
L7Protocol::Unknown,
574+
false,
572575
self.context.agent_mode,
573576
);
574577
// edge_stats: If the direction of a certain end is known, the statistical data
@@ -712,6 +715,7 @@ impl Stash {
712715
meter.biz_type,
713716
meter.time_span,
714717
meter.l7_protocol,
718+
meter.is_reversed,
715719
self.context.agent_mode,
716720
);
717721
tagger.code |= Code::L7_PROTOCOL;
@@ -742,6 +746,7 @@ impl Stash {
742746
meter.biz_type,
743747
meter.time_span,
744748
meter.l7_protocol,
749+
meter.is_reversed,
745750
self.context.agent_mode,
746751
);
747752
tagger.code |= Code::L7_PROTOCOL;
@@ -758,6 +763,7 @@ impl Stash {
758763
meter.biz_type,
759764
meter.time_span,
760765
meter.l7_protocol,
766+
meter.is_reversed,
761767
self.context.agent_mode,
762768
);
763769
tagger.code |= Code::L7_PROTOCOL;
@@ -879,13 +885,14 @@ fn get_single_tagger(
879885
global_thread_id: u8,
880886
flow: &MiniFlow,
881887
ep: usize,
882-
direction: Direction,
888+
mut direction: Direction,
883889
is_active_host: bool,
884890
config: &CollectorConfig,
885891
endpoint: Option<String>,
886892
biz_type: u8,
887893
time_span: u32,
888894
l7_protocol: L7Protocol,
895+
is_reversed: bool,
889896
agent_mode: RunningMode,
890897
) -> Tagger {
891898
let flow_key = &flow.flow_key;
@@ -937,17 +944,35 @@ fn get_single_tagger(
937944
l3_epc_id: get_l3_epc_id(side.l3_epc_id, flow.signal_source),
938945
gpid: side.gpid,
939946
protocol: flow_key.proto,
940-
direction,
941-
tap_side: TapSide::from(direction),
947+
direction: if is_reversed {
948+
direction.reverse()
949+
} else {
950+
direction
951+
},
952+
tap_side: if is_reversed {
953+
TapSide::from(direction).reverse()
954+
} else {
955+
TapSide::from(direction)
956+
},
942957
tap_port: flow_key.tap_port,
943958
tap_type: flow_key.tap_type,
944959
// If the resource is located on the client, the service port is ignored
945-
server_port: if ep == FLOW_METRICS_PEER_SRC
946-
|| ignore_server_port(flow, config.inactive_server_port_aggregation)
947-
{
960+
server_port: if ignore_server_port(flow, config.inactive_server_port_aggregation) {
948961
0
949962
} else {
950-
flow.peers[1].nat_real_port
963+
if ep == FLOW_METRICS_PEER_SRC {
964+
if is_reversed {
965+
flow.peers[0].nat_real_port
966+
} else {
967+
0
968+
}
969+
} else {
970+
if is_reversed {
971+
0
972+
} else {
973+
flow.peers[1].nat_real_port
974+
}
975+
}
951976
},
952977
is_ipv6,
953978
code: {
@@ -988,6 +1013,7 @@ fn get_edge_tagger(
9881013
biz_type: u8,
9891014
time_span: u32,
9901015
l7_protocol: L7Protocol,
1016+
is_reversed: bool,
9911017
agent_mode: RunningMode,
9921018
) -> Tagger {
9931019
let flow_key = &flow.flow_key;
@@ -1040,7 +1066,7 @@ fn get_edge_tagger(
10401066
(src_mac, dst_mac)
10411067
};
10421068

1043-
Tagger {
1069+
let mut tagger = Tagger {
10441070
global_thread_id,
10451071
agent_id: config.agent_id,
10461072
mac: src_mac,
@@ -1087,7 +1113,18 @@ fn get_edge_tagger(
10871113
biz_type,
10881114
time_span,
10891115
..Default::default()
1116+
};
1117+
1118+
if is_reversed {
1119+
let server_port = if ignore_server_port(flow, config.inactive_server_port_aggregation) {
1120+
0
1121+
} else {
1122+
src_ep.nat_real_port
1123+
};
1124+
tagger.reverse(server_port);
10901125
}
1126+
1127+
tagger
10911128
}
10921129

10931130
fn get_l3_epc_id(l3_epc_id: i32, signal_source: SignalSource) -> i16 {

agent/src/collector/l7_quadruple_generator.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ struct AppMeterWithL7Protocol {
7070
l7_protocol: L7Protocol,
7171
biz_type: u8,
7272
time_in_second: Duration,
73+
is_reversed: bool,
7374
}
7475

7576
struct QuadrupleStash {
@@ -278,6 +279,7 @@ impl SubQuadGen {
278279
m.endpoint == l7_stats.endpoint
279280
&& m.biz_type == l7_stats.biz_type
280281
&& m.time_span == time_span
282+
&& m.is_reversed == l7_stats.is_reversed
281283
}) {
282284
// flow L7Protocol of different client ports on the same server port may be inconsistent.
283285
// unknown l7_protocol needs to be judged by the close_type and duration of the flow,
@@ -298,6 +300,7 @@ impl SubQuadGen {
298300
biz_type: l7_stats.biz_type,
299301
time_span,
300302
time_in_second,
303+
is_reversed: l7_stats.is_reversed,
301304
};
302305
meters.push(meter);
303306
}
@@ -323,6 +326,7 @@ impl SubQuadGen {
323326
is_active_host1,
324327
time_in_second: meter.time_in_second.into(),
325328
biz_type: meter.biz_type,
329+
is_reversed: meter.is_reversed,
326330
time_span,
327331
});
328332

@@ -359,6 +363,7 @@ impl SubQuadGen {
359363
time_in_second: l7_stats.time_in_second.into(),
360364
biz_type: l7_stats.biz_type,
361365
time_span,
366+
is_reversed: l7_stats.is_reversed,
362367
});
363368
if close_type != CloseType::Unknown && close_type != CloseType::ForcedReport {
364369
Self::push_closed_app_meter(
@@ -378,6 +383,7 @@ impl SubQuadGen {
378383
biz_type: l7_stats.biz_type,
379384
time_span,
380385
time_in_second,
386+
is_reversed: l7_stats.is_reversed,
381387
};
382388
let _ = stash.l7_stats.insert(l7_stats.flow_id, vec![meter]);
383389
}

agent/src/collector/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ pub struct AppMeterWithFlow {
162162
pub time_in_second: Timestamp,
163163
// request-reponse time span
164164
pub time_span: u32,
165+
pub is_reversed: bool,
165166
}
166167

167168
#[derive(Clone)]

agent/src/common/flow.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ pub struct L7Stats {
551551
// request-reponse time span
552552
pub time_span: u32,
553553
pub biz_type: u8,
554+
pub is_reversed: bool,
554555
}
555556

556557
#[derive(Serialize, Debug, Default, Clone, PartialEq, Eq)]

agent/src/common/l7_protocol_info.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,12 @@ where
195195

196196
// load endpoint from cache for *responses*
197197
// call this before calling `perf_stats` because the latter may remove cached entry
198-
fn load_endpoint_from_cache<'a>(&mut self, param: &'a ParseParam) -> Option<String> {
199-
let key = LogCacheKey::new(param, self.session_id());
198+
fn load_endpoint_from_cache<'a>(
199+
&mut self,
200+
param: &'a ParseParam,
201+
is_reversed: bool,
202+
) -> Option<String> {
203+
let key = LogCacheKey::new(param, self.session_id(), is_reversed);
200204
match param.l7_perf_cache.borrow_mut().rrt_cache.get(&key) {
201205
Some(cached) if cached.endpoint.is_some() => {
202206
let log = LogCache {
@@ -258,9 +262,10 @@ where
258262
(&mut perf_cache.rrt_cache, &mut perf_cache.timeout_cache)
259263
},
260264
);
261-
let key = LogCacheKey::new(param, self.session_id());
265+
let key = LogCacheKey::new(param, self.session_id(), self.is_reversed());
262266
let prev_info = rtt_cache.get_mut(&key);
263267
let timeout_counter = timeout_cache.get_or_insert_mut(param.flow_id);
268+
let index = if self.is_reversed() { 1 } else { 0 };
264269

265270
let Some(prev_info) = prev_info else {
266271
// If the first log is a request and on blacklist, we still need to put it in cache to handle the response,
@@ -269,7 +274,7 @@ where
269274
// If the first log is a response, it's perf stats will not be counted here.
270275
// We need to know whether its corresponding request is on blacklist before accounting.
271276
let ret = if cur_info.msg_type == LogMessageType::Request && !cur_info.on_blacklist {
272-
timeout_counter.in_cache += 1;
277+
timeout_counter.in_cache[index] += 1;
273278
Some(L7PerfStats::from(&cur_info))
274279
} else {
275280
None
@@ -287,7 +292,8 @@ where
287292
if prev_info.msg_type != cur_info.msg_type && !merge_info.merged {
288293
merge_info.merged = true;
289294
if !(prev_info.on_blacklist || cur_info.on_blacklist) {
290-
timeout_counter.in_cache = timeout_counter.in_cache.saturating_sub(1);
295+
timeout_counter.in_cache[index] =
296+
timeout_counter.in_cache[index].saturating_sub(1);
291297
}
292298
}
293299

@@ -297,7 +303,7 @@ where
297303
keep_prev = !(merge_info.req_end && merge_info.resp_end);
298304
} else {
299305
if prev_info.msg_type == LogMessageType::Request && !prev_info.on_blacklist {
300-
timeout_counter.in_cache = timeout_counter.in_cache.saturating_sub(1);
306+
timeout_counter.in_cache[index] = timeout_counter.in_cache[index].saturating_sub(1);
301307
}
302308
}
303309

@@ -311,7 +317,7 @@ where
311317
if rrt > param.rrt_timeout as u64 {
312318
match prev_info.multi_merge_info.as_ref() {
313319
Some(info) if info.merged => (),
314-
_ => timeout_counter.timeout += 1,
320+
_ => timeout_counter.timeout[index] += 1,
315321
}
316322
} else {
317323
perf_stats.update_rrt(rrt);
@@ -339,7 +345,7 @@ where
339345
warn!("l7 log info disorder with long time rrt {}", rrt);
340346
match prev_info.multi_merge_info.as_ref() {
341347
Some(info) if info.merged => (),
342-
_ => timeout_counter.timeout += 1,
348+
_ => timeout_counter.timeout[index] += 1,
343349
}
344350
}
345351

@@ -365,10 +371,10 @@ where
365371

366372
if prev_info.time > cur_info.time {
367373
if !cur_info.on_blacklist && cur_info.msg_type == LogMessageType::Request {
368-
timeout_counter.timeout += 1;
374+
timeout_counter.timeout[index] += 1;
369375
}
370376
if !prev_info.on_blacklist && prev_info.msg_type == LogMessageType::Request {
371-
timeout_counter.in_cache += 1;
377+
timeout_counter.in_cache[index] += 1;
372378
}
373379
if !cur_info.on_blacklist {
374380
Some(L7PerfStats::from(&cur_info))
@@ -377,10 +383,10 @@ where
377383
}
378384
} else {
379385
if !prev_info.on_blacklist && prev_info.msg_type == LogMessageType::Request {
380-
timeout_counter.timeout += 1;
386+
timeout_counter.timeout[index] += 1;
381387
}
382388
if !cur_info.on_blacklist && cur_info.msg_type == LogMessageType::Request {
383-
timeout_counter.in_cache += 1;
389+
timeout_counter.in_cache[index] += 1;
384390
}
385391
let prev_info = rtt_cache.put(key, cur_info).unwrap();
386392
if !prev_info.on_blacklist {
@@ -394,7 +400,7 @@ where
394400
&& prev_info.msg_type != cur_info.msg_type
395401
&& !prev_info.multi_merge_info.as_ref().unwrap().merged
396402
{
397-
timeout_counter.timeout += 1;
403+
timeout_counter.timeout[index] += 1;
398404
}
399405
if !keep_prev {
400406
rtt_cache.pop(&key);

0 commit comments

Comments
 (0)