Skip to content

Commit 1a9f096

Browse files
committed
feat: support multi 8583
1 parent 0095d3d commit 1a9f096

File tree

2 files changed

+95
-85
lines changed
  • agent

2 files changed

+95
-85
lines changed

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ pub mod l7 {
381381
unimplemented!()
382382
}
383383

384-
pub fn parse_payload(&mut self, _: &[u8], _: bool, _: &Iso8583ParseConfig) -> bool {
384+
pub fn parse_payload_multiple(&mut self, _: &[u8], _: &Iso8583ParseConfig) -> Vec<Vec<FieldValue>> {
385385
unimplemented!()
386386
}
387387
}

agent/src/flow_generator/protocol_logs/rpc/iso8583.rs

Lines changed: 94 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use public::l7_protocol::L7Protocol;
2222
use crate::config::handler::LogParserConfig;
2323
use crate::{
2424
common::{
25-
flow::{L7PerfStats, PacketDirection},
25+
flow::L7PerfStats,
2626
l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface},
2727
l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, LogCache, ParseParam},
2828
meta_packet::ApplicationFlags,
@@ -206,114 +206,124 @@ impl L7ProtocolParserInterface for Iso8583Log {
206206
}
207207

208208
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
209-
if !self.parser.parse_payload(
209+
// 支持 payload 中包含多个 ISO8583 消息
210+
let msgs = self.parser.parse_payload_multiple(
210211
payload,
211-
param.direction == PacketDirection::ClientToServer,
212212
&Iso8583ParseConfig {
213213
extract_fields: param.iso8583_parse_conf.extract_fields.clone(),
214214
translation_enabled: param.iso8583_parse_conf.translation_enabled,
215215
pan_obfuscate: param.iso8583_parse_conf.pan_obfuscate,
216216
},
217-
) {
217+
);
218+
if msgs.is_empty() {
218219
return Err(Error::L7ProtocolUnknown);
219-
};
220+
}
220221

221222
if self.perf_stats.is_none() && param.parse_perf {
222223
self.perf_stats = Some(L7PerfStats::default());
223224
};
224225

225-
let mut info = Iso8583Info::default();
226-
for field in self.parser.fields.drain(..) {
227-
if info.mti.is_empty() && field.id == 0 {
228-
info.mti = field.translated.clone().unwrap_or(field.value.clone());
229-
// Determine if it's a response based on MTI
230-
if let Some(&b) = info.mti.as_bytes().get(2) {
231-
if b % 2 == 1 {
232-
info.msg_type = LogMessageType::Response;
233-
} else {
234-
info.msg_type = LogMessageType::Request;
226+
// 对每条解析到的消息构造 Iso8583Info,并返回 Multiple 或 Single
227+
let mut results: Vec<L7ProtocolInfo> = Vec::with_capacity(msgs.len());
228+
for fields in msgs.into_iter() {
229+
let mut info = Iso8583Info::default();
230+
for field in fields.into_iter() {
231+
if info.mti.is_empty() && field.id == 0 {
232+
info.mti = field.translated.clone().unwrap_or(field.value.clone());
233+
// Determine if it's a response based on MTI
234+
if let Some(&b) = info.mti.as_bytes().get(2) {
235+
if b % 2 == 1 {
236+
info.msg_type = LogMessageType::Response;
237+
} else {
238+
info.msg_type = LogMessageType::Request;
239+
info.response_status = L7ResponseStatus::Ok;
240+
}
241+
}
242+
} else if field.id == 7 {
243+
info.f7 = field.value.clone();
244+
} else if field.id == 11 {
245+
info.f11 = field.value.clone();
246+
} else if field.id == 32 {
247+
info.f32 = field.value.clone();
248+
} else if field.id == 33 {
249+
info.f33 = field.value.clone();
250+
} else if field.id == 39 {
251+
info.msg_type = LogMessageType::Response;
252+
if field.value == "00"
253+
|| field.value == "10"
254+
|| field.value == "11"
255+
|| field.value == "16"
256+
|| field.value == "A2"
257+
|| field.value == "A4"
258+
|| field.value == "A5"
259+
|| field.value == "A6"
260+
|| field.value == "Y1"
261+
|| field.value == "Y3"
262+
{
235263
info.response_status = L7ResponseStatus::Ok;
264+
} else {
265+
info.response_status = L7ResponseStatus::ClientError;
266+
info.response_exception =
267+
field.translated.clone().unwrap_or(field.value.clone());
268+
self.perf_stats.as_mut().map(|p| p.inc_req_err());
236269
}
237-
}
238-
} else if field.id == 7 {
239-
info.f7 = field.value.clone();
240-
} else if field.id == 11 {
241-
info.f11 = field.value.clone();
242-
} else if field.id == 32 {
243-
info.f32 = field.value.clone();
244-
} else if field.id == 33 {
245-
info.f33 = field.value.clone();
246-
} else if field.id == 39 {
247-
info.msg_type = LogMessageType::Response;
248-
if field.value == "00"
249-
|| field.value == "10"
250-
|| field.value == "11"
251-
|| field.value == "16"
252-
|| field.value == "A2"
253-
|| field.value == "A4"
254-
|| field.value == "A5"
255-
|| field.value == "A6"
256-
|| field.value == "Y1"
257-
|| field.value == "Y3"
270+
};
271+
set_captured_byte!(info, param);
272+
273+
if !param
274+
.iso8583_parse_conf
275+
.extract_fields
276+
.get(field.id as usize)
277+
.unwrap_or(false)
258278
{
259-
info.response_status = L7ResponseStatus::Ok;
260-
} else {
261-
info.response_status = L7ResponseStatus::ClientError;
262-
info.response_exception =
263-
field.translated.clone().unwrap_or(field.value.clone());
264-
self.perf_stats.as_mut().map(|p| p.inc_req_err());
279+
continue;
265280
}
266-
};
267-
set_captured_byte!(info, param);
268-
269-
if !param
270-
.iso8583_parse_conf
271-
.extract_fields
272-
.get(field.id as usize)
273-
.unwrap_or(false)
274-
{
275-
continue;
276-
}
277281

278-
if field.id == 2 && param.iso8583_parse_conf.pan_obfuscate {
282+
if field.id == 2 && param.iso8583_parse_conf.pan_obfuscate {
283+
info.attributes.push(KeyVal {
284+
key: field.description,
285+
val: mask_card_number(&field.value),
286+
});
287+
continue;
288+
}
279289
info.attributes.push(KeyVal {
280290
key: field.description,
281-
val: mask_card_number(&field.value),
291+
val: field.translated.unwrap_or(field.value),
282292
});
283-
continue;
284293
}
285-
info.attributes.push(KeyVal {
286-
key: field.description,
287-
val: field.translated.unwrap_or(field.value),
288-
});
289-
}
290294

291-
if !info.f7.is_empty()
292-
&& !info.f11.is_empty()
293-
&& !info.f32.is_empty()
294-
&& !info.f33.is_empty()
295-
{
296-
info.trace_ids.merge_field(
297-
BASE_FIELD_PRIORITY,
298-
format!("{}-{}-{}-{}", info.f7, info.f11, info.f32, info.f33),
299-
);
300-
}
295+
if !info.f7.is_empty()
296+
&& !info.f11.is_empty()
297+
&& !info.f32.is_empty()
298+
&& !info.f33.is_empty()
299+
{
300+
info.trace_ids.merge_field(
301+
BASE_FIELD_PRIORITY,
302+
format!("{}-{}-{}-{}", info.f7, info.f11, info.f32, info.f33),
303+
);
304+
}
301305

302-
if let Some(config) = param.parse_config {
303-
info.set_is_on_blacklist(config);
304-
}
305-
info.is_async = true;
306-
307-
if let Some(perf_stats) = self.perf_stats.as_mut() {
308-
if let Some(stats) = info.perf_stats(param) {
309-
perf_stats.sequential_merge(&stats);
310-
perf_stats.rrt_max = 0;
311-
perf_stats.rrt_sum = 0;
312-
perf_stats.rrt_count = 0;
306+
if let Some(config) = param.parse_config {
307+
info.set_is_on_blacklist(config);
308+
}
309+
info.is_async = true;
310+
311+
if let Some(perf_stats) = self.perf_stats.as_mut() {
312+
if let Some(stats) = info.perf_stats(param) {
313+
perf_stats.sequential_merge(&stats);
314+
perf_stats.rrt_max = 0;
315+
perf_stats.rrt_sum = 0;
316+
perf_stats.rrt_count = 0;
317+
}
313318
}
314-
}
315319

316-
Ok(L7ParseResult::Single(L7ProtocolInfo::Iso8583Info(info)))
320+
results.push(L7ProtocolInfo::Iso8583Info(info));
321+
}
322+
if results.len() == 1 {
323+
Ok(L7ParseResult::Single(results.into_iter().next().unwrap()))
324+
} else {
325+
Ok(L7ParseResult::Multi(results))
326+
}
317327
}
318328

319329
fn protocol(&self) -> L7Protocol {

0 commit comments

Comments
 (0)