From 70ca905e2dc005f62110352ab1202c9aac3c3db9 Mon Sep 17 00:00:00 2001 From: Andrew Cooks Date: Thu, 24 Jul 2025 18:19:52 +1000 Subject: [PATCH 1/2] feat(ui): Add frontend support for "Other Flows" category This prepares the "Top Talkers" chart to visualize an aggregated "Other Flows" category, which will eventually be provided by the backend. This makes the chart more comprehensive by accounting for all measured traffic. The D3.js chart rendering logic in `jittertrap-chart-toptalk.js` has been updated to: - Recognize a reserved flow key of "other". - Conditionally render the legend text as "Other Flows" for this key, instead of attempting to parse it for IP/port details. - Apply a distinct, neutral grey color to the "other" flow in the stacked area chart, the distribution bar, and the legend to visually distinguish it from the top 10 flows. The frontend is now ready to correctly display this data once the backend implementation is complete. --- .../src/js/jittertrap-chart-toptalk.js | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/html5-client/src/js/jittertrap-chart-toptalk.js b/html5-client/src/js/jittertrap-chart-toptalk.js index 3a26b37..f2f1b71 100644 --- a/html5-client/src/js/jittertrap-chart-toptalk.js +++ b/html5-client/src/js/jittertrap-chart-toptalk.js @@ -220,6 +220,13 @@ return { formattedData, maxSlice }; } + const getFlowColor = (key) => { + if (key === 'other') { + return '#cccccc'; // a neutral grey + } + return colorScale(key); + }; + /* Update the chart (try to avoid memory allocations here!) */ m.redraw = function() { @@ -277,7 +284,7 @@ stackedChartData.forEach(layer => { context.beginPath(); area(layer); - context.fillStyle = colorScale(layer.key); + context.fillStyle = getFlowColor(layer.key); context.fill(); }); @@ -314,7 +321,7 @@ .attr("y", 9) .attr("x", d => x(d.x0)) .attr("width", d => x(d.x1) - x(d.x0)) - .style("fill", d => colorScale(d.k)); + .style("fill", d => getFlowColor(d.k)); barsbox.attr("transform", "translate(" + margin.left + "," + 350 + ")"); @@ -347,28 +354,32 @@ // Add the complex structure only ONCE when elements are created legendTextEnter.each(function(d) { - const parts = d.split('/'); - const sourceIP = parts[1]; - const sourcePort = parts[2]; - const destIP = parts[3]; - const destPort = parts[4]; - const proto = parts[5]; - const tclass = parts[6]; - const textNode = d3.select(this); - textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP); - textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6)); - textNode.append("tspan").attr("x", "30.5em").text("->"); - textNode.append("tspan").attr("x", "32.5em").text(destIP); - textNode.append("tspan").attr("x", "58em").text(":" + destPort); - textNode.append("tspan").attr("x", "63.5em").text("| " + proto); - textNode.append("tspan").attr("x", "70em").text("| " + tclass); + if (d === 'other') { + textNode.append("tspan").attr("x", 25).text("Other Flows"); + } else { + const parts = d.split('/'); + const sourceIP = parts[1]; + const sourcePort = parts[2]; + const destIP = parts[3]; + const destPort = parts[4]; + const proto = parts[5]; + const tclass = parts[6]; + + textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP); + textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6)); + textNode.append("tspan").attr("x", "30.5em").text("->"); + textNode.append("tspan").attr("x", "32.5em").text(destIP); + textNode.append("tspan").attr("x", "58em").text(":" + destPort); + textNode.append("tspan").attr("x", "63.5em").text("| " + proto); + textNode.append("tspan").attr("x", "70em").text("| " + tclass); + } }); // UPDATE + ENTER - update positions and colors for all visible items const legendUpdate = legend.merge(legendEnter); legendUpdate.attr("transform", (d, i) => "translate(0, " + ((i + 1) * 25) + ")"); - legendUpdate.select("rect").style("fill", colorScale); + legendUpdate.select("rect").style("fill", getFlowColor); }; From d9279cce2f5bdb1e437a953041cbc5b4d2267d0b Mon Sep 17 00:00:00 2001 From: Andrew Cooks Date: Sun, 27 Jul 2025 15:56:44 +1000 Subject: [PATCH 2/2] feature: add aggregated 'other' flow This commit introduces a new feature to the Top Talkers chart that aggregates all flows not in the top 10 into a single 'Other' data series for visualization. The backend identifies a candidate pool of the top N flows (defined by `MAX_FLOWS`, e.g., 20) and sends them to the frontend without any pre-aggregation of "other" traffic. This number of flows in the pool and messages must be bounded, and the chosen value must be sufficiently large to deal with Rank Volatiliy, where flows enter or leave the top-k displayed flows. The aggregation logic is implemented in the Javascript frontend. - It receives the larger candidate pool of flows from the backend. - It separates the top 10 flows for individual display in the chart legend. - It then aggregates the remaining flows (flows 11-20) into a single 'Other' data series, which is rendered on the chart with a distinct color and label. - The `jittertrap-core.js` module is updated to ensure the full candidate pool is passed to the charting component instead of being truncated to 10. --- deps/toptalk/intervals.c | 13 ++-- docs/top-talkers.md | 61 +++++++++++++++++++ .../src/js/jittertrap-chart-toptalk.js | 51 ++++++++++++++-- html5-client/src/js/jittertrap-core.js | 3 +- messages/include/jt_msg_toptalk.h | 3 +- server/tt_thread.c | 7 ++- 6 files changed, 119 insertions(+), 19 deletions(-) create mode 100644 docs/top-talkers.md diff --git a/deps/toptalk/intervals.c b/deps/toptalk/intervals.c index 559c0f4..6baf996 100644 --- a/deps/toptalk/intervals.c +++ b/deps/toptalk/intervals.c @@ -368,28 +368,27 @@ static void tt_get_top5(struct tt_top_flows *t5, struct timeval deadline) { struct flow_hash *rfti; /* reference flow table iter */ - /* sort the flow reference table */ + /* sort the flow reference table by byte count */ HASH_SRT(r_hh, flow_ref_table, bytes_cmp); /* - * expire old packets in the output path + * Expire old packets in the output path. * NB: must be called in packet receive path as well. */ expire_old_packets(deadline); - /* check if the interval is complete and then rotate tables */ + /* Check if the interval is complete and then rotate tables */ expire_old_interval_tables(deadline); - /* for each of the top 5 flow in the reference table, - * fill the counts from the short-interval flow tables */ + /* For each of the top N flows in the reference table, + * fill the counts from the short-interval flow tables. */ rfti = flow_ref_table; - for (int i = 0; i < MAX_FLOW_COUNT && rfti; i++) { fill_short_int_flows(t5->flow[i], rfti); rfti = rfti->r_hh.next; } - t5->flow_count = HASH_CNT(r_hh, flow_ref_table); + t5->flow_count = HASH_CNT(r_hh, flow_ref_table); t5->total_bytes = rate_calc(ref_window_size, totals.bytes); t5->total_packets = rate_calc(ref_window_size, totals.packets); t5->timestamp = deadline; diff --git a/docs/top-talkers.md b/docs/top-talkers.md new file mode 100644 index 0000000..68e7555 --- /dev/null +++ b/docs/top-talkers.md @@ -0,0 +1,61 @@ +# Jittertrap Top Talkers Feature Overview + +This document describes the architecture and data flow for the "Top Talkers" feature, which performs real-time network traffic analysis and visualization. + +## Architecture + +The system is a two-part application: + + A C-based backend responsible for high-performance packet capture, flow tracking, and data aggregation. + + A Javascript-based frontend responsible for data processing, final summarization, and interactive visualization. + +This decoupled design allows the backend to focus on efficient, real-time processing while giving the frontend the flexibility to handle data presentation. + +## Backend (C Language) + +The backend runs as a set of high-priority threads to ensure timely packet capture and processing. + +### 1. Packet Capture and Flow Tracking (intervals.c) + + Capture: The pcap library is used to capture raw packets from a selected network interface in real-time. The capture process is managed within a dedicated thread (tt_intervals_run) to minimize packet loss. + + Decoding: Each captured packet is passed through a series of decoders (decode.c) to parse Ethernet, IP (v4/v6), and Transport layer (TCP, UDP, etc.) headers. The relevant data (IPs, ports, protocol, byte count) is extracted into a flow_record struct. + + Flow Tracking: The system uses hash tables (uthash.h) to track network flows. + + Interval Tables: Multiple hash tables (incomplete_flow_tables, complete_flow_tables) track flow statistics over different short-term intervals (e.g., 5ms, 100ms, 1s) defined in intervals_user.c. + + Reference Table: A primary hash table (flow_ref_table) tracks flows over a longer, sliding time window. This table is used to identify the overall top flows across all intervals. + +### 2. Data Forwarding (tt_thread.c) + + Data Preparation: A separate thread (intervals_run) periodically wakes up to process the tracked flow data. It identifies the top flows from the reference table. + + Candidate Pool: The m2m function prepares a message for the frontend. Crucially, it creates a "candidate pool" of top flows. It sends up to MAX_FLOWS (defined as 20 in jt_msg_toptalk.h) of the highest-volume flows. This provides the frontend with more context than just the top 10. + + Message Queue: The prepared message is sent to the frontend via a message queue system (mq_tt_produce). + +## Frontend (Javascript) + +The frontend receives the candidate pool of flows and performs the final processing and visualization. + +### 1. Data Ingestion & Processing (jittertrap-core.js) + + Websocket: The frontend receives messages from the backend via a websocket connection (jittertrap-websocket.js). + + Core Processing: The processTopTalkMsg function in jittertrap-core.js is the main entry point for new data. It takes the incoming message containing up to 20 flows and updates its internal time-series data structures (flowsTS, flowRank, flowsTotals). flowRank is a sorted list of all flows received within the current time window, ranked by total bytes. + +### 2. Visualization and Aggregation (jittertrap-chart-toptalk.js) + + Final Summarization: This is the key to the new architecture. The chart's redraw function calls a helper, processAndAggregateChartData. + + This function takes the full list of flows prepared by jittertrap-core.js. + + It defines a LEGEND_DISPLAY_LIMIT (set to 10). + + It splits the incoming data: the first 10 flows are kept as individual series, and the rest (flows 11-20) are aggregated. + + It sums the byte counts of the remaining flows for each timestamp into a new, special data series with the key 'other'. + + Rendering: The final dataset, consisting of the top 10 individual flows plus the single "Other" aggregate flow, is passed to the D3.js charting library. The chart's existing logic recognizes the 'other' key and assigns it a unique color and a simple "Other Flows" label in the legend, displaying it as part of the stacked area chart. diff --git a/html5-client/src/js/jittertrap-chart-toptalk.js b/html5-client/src/js/jittertrap-chart-toptalk.js index f2f1b71..bee82fe 100644 --- a/html5-client/src/js/jittertrap-chart-toptalk.js +++ b/html5-client/src/js/jittertrap-chart-toptalk.js @@ -19,6 +19,42 @@ return chartData; }; + const processAndAggregateChartData = function(incomingData) { + const LEGEND_DISPLAY_LIMIT = 10; + + if (incomingData.length <= LEGEND_DISPLAY_LIMIT) { + return incomingData; + } + + const topNFlows = incomingData.slice(0, LEGEND_DISPLAY_LIMIT); + const remainingFlows = incomingData.slice(LEGEND_DISPLAY_LIMIT); + + const otherFlow = { + fkey: 'other', + tbytes: 0, + values: [] + }; + + const otherValuesMap = new Map(); + + remainingFlows.forEach(flow => { + otherFlow.tbytes += flow.tbytes; + flow.values.forEach(dataPoint => { + const currentBytes = otherValuesMap.get(dataPoint.ts) || 0; + otherValuesMap.set(dataPoint.ts, currentBytes + dataPoint.bytes); + }); + }); + + otherValuesMap.forEach((bytes, ts) => { + otherFlow.values.push({ ts: ts, bytes: bytes }); + }); + + // Ensure the values are sorted by timestamp, as d3 expects + otherFlow.values.sort((a, b) => a.ts - b.ts); + + return topNFlows.concat(otherFlow); + }; + my.charts.toptalk.toptalkChart = (function (m) { const margin = { top: 20, @@ -231,16 +267,19 @@ /* Update the chart (try to avoid memory allocations here!) */ m.redraw = function() { + // Process the raw chartData to aggregate "other" flows before drawing + const processedChartData = processAndAggregateChartData(chartData); + const width = size.width - margin.left - margin.right; const height = size.height - margin.top - margin.bottom; xScale = d3.scaleLinear().range([0, width]); /* compute the domain of x as the [min,max] extent of timestamps * of the first (largest) flow */ - if (chartData && chartData[0]) - xScale.domain(d3.extent(chartData[0].values, d => d.ts)); + if (processedChartData && processedChartData[0]) + xScale.domain(d3.extent(processedChartData[0].values, d => d.ts)); - const { formattedData, maxSlice } = formatDataAndGetMaxSlice(chartData); + const { formattedData, maxSlice } = formatDataAndGetMaxSlice(processedChartData); const yPow = d3.select('input[name="y-axis-is-log"]:checked').node().value; @@ -264,7 +303,7 @@ svg.select(".xGrid").call(xGrid); svg.select(".yGrid").call(yGrid); - const fkeys = chartData.map(f => f.fkey); + const fkeys = processedChartData.map(f => f.fkey); colorScale.domain(fkeys); stack.keys(fkeys); @@ -289,10 +328,10 @@ }); // distribution bar - const tbytes = chartData.reduce((sum, f) => sum + f.tbytes, 0); + const tbytes = processedChartData.reduce((sum, f) => sum + f.tbytes, 0); let rangeStop = 0; - const barData = chartData.map(f => { + const barData = processedChartData.map(f => { const new_d = { k: f.fkey, x0: rangeStop, diff --git a/html5-client/src/js/jittertrap-core.js b/html5-client/src/js/jittertrap-core.js index 8aeaea9..58df926 100644 --- a/html5-client/src/js/jittertrap-core.js +++ b/html5-client/src/js/jittertrap-core.js @@ -212,8 +212,7 @@ const updateTopFlowChartData = function(interval) { const chartPeriod = my.charts.getChartPeriod(); const chartSeries = JT.charts.getTopFlowsRef(); - const fcount = (flowRank[interval].length < 10) ? - flowRank[interval].length : 10; + const fcount = flowRank[interval].length; updateSampleCounts(interval); diff --git a/messages/include/jt_msg_toptalk.h b/messages/include/jt_msg_toptalk.h index 34ae873..3fee005 100644 --- a/messages/include/jt_msg_toptalk.h +++ b/messages/include/jt_msg_toptalk.h @@ -7,9 +7,10 @@ int jt_toptalk_printer(void *data, char *out, int len); int jt_toptalk_free(void *data); const char *jt_toptalk_test_msg_get(void); +/* MAX_FLOWS should be 2x the number of displayed top N flows. */ #define MAX_FLOWS 20 #define ADDR_LEN 50 -#define PROTO_LEN 5 +#define PROTO_LEN 6 #define TCLASS_LEN 5 struct jt_msg_toptalk diff --git a/server/tt_thread.c b/server/tt_thread.c index 63cbf74..0fe368e 100644 --- a/server/tt_thread.c +++ b/server/tt_thread.c @@ -152,7 +152,10 @@ static int m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval) struct jt_msg_toptalk *m = &msg->m; char s_addr_str[INET6_ADDRSTRLEN] = { 0 }; char d_addr_str[INET6_ADDRSTRLEN] = { 0 }; - int flow_count; + + // Determine the number of flows to send, capped by the message capacity. + // This relies on the compile-time MAX_FLOW_COUNT being >= MAX_FLOWS. + int flow_count = MIN(ttf->flow_count, MAX_FLOWS); m->timestamp.tv_sec = ttf->timestamp.tv_sec; m->timestamp.tv_nsec = ttf->timestamp.tv_usec * 1000; @@ -160,7 +163,6 @@ static int m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval) m->interval_ns = tt_intervals[interval].tv_sec * 1E9 + tt_intervals[interval].tv_usec * 1E3; - flow_count = MIN(ttf->flow_count, MAX_FLOWS); m->tflows = flow_count; m->tbytes = ttf->total_bytes; m->tpackets = ttf->total_packets; @@ -355,4 +357,3 @@ int intervals_thread_init(void) return 0; } -