diff --git a/deps/toptalk/intervals.c b/deps/toptalk/intervals.c index 559c0f40..6baf9966 100644 --- a/deps/toptalk/intervals.c +++ b/deps/toptalk/intervals.c @@ -368,28 +368,27 @@ static void tt_get_top5(struct tt_top_flows *t5, struct timeval deadline) { struct flow_hash *rfti; /* reference flow table iter */ - /* sort the flow reference table */ + /* sort the flow reference table by byte count */ HASH_SRT(r_hh, flow_ref_table, bytes_cmp); /* - * expire old packets in the output path + * Expire old packets in the output path. * NB: must be called in packet receive path as well. */ expire_old_packets(deadline); - /* check if the interval is complete and then rotate tables */ + /* Check if the interval is complete and then rotate tables */ expire_old_interval_tables(deadline); - /* for each of the top 5 flow in the reference table, - * fill the counts from the short-interval flow tables */ + /* For each of the top N flows in the reference table, + * fill the counts from the short-interval flow tables. */ rfti = flow_ref_table; - for (int i = 0; i < MAX_FLOW_COUNT && rfti; i++) { fill_short_int_flows(t5->flow[i], rfti); rfti = rfti->r_hh.next; } - t5->flow_count = HASH_CNT(r_hh, flow_ref_table); + t5->flow_count = HASH_CNT(r_hh, flow_ref_table); t5->total_bytes = rate_calc(ref_window_size, totals.bytes); t5->total_packets = rate_calc(ref_window_size, totals.packets); t5->timestamp = deadline; diff --git a/docs/top-talkers.md b/docs/top-talkers.md new file mode 100644 index 00000000..68e75556 --- /dev/null +++ b/docs/top-talkers.md @@ -0,0 +1,61 @@ +# Jittertrap Top Talkers Feature Overview + +This document describes the architecture and data flow for the "Top Talkers" feature, which performs real-time network traffic analysis and visualization. + +## Architecture + +The system is a two-part application: + + A C-based backend responsible for high-performance packet capture, flow tracking, and data aggregation. + + A Javascript-based frontend responsible for data processing, final summarization, and interactive visualization. + +This decoupled design allows the backend to focus on efficient, real-time processing while giving the frontend the flexibility to handle data presentation. + +## Backend (C Language) + +The backend runs as a set of high-priority threads to ensure timely packet capture and processing. + +### 1. Packet Capture and Flow Tracking (intervals.c) + + Capture: The pcap library is used to capture raw packets from a selected network interface in real-time. The capture process is managed within a dedicated thread (tt_intervals_run) to minimize packet loss. + + Decoding: Each captured packet is passed through a series of decoders (decode.c) to parse Ethernet, IP (v4/v6), and Transport layer (TCP, UDP, etc.) headers. The relevant data (IPs, ports, protocol, byte count) is extracted into a flow_record struct. + + Flow Tracking: The system uses hash tables (uthash.h) to track network flows. + + Interval Tables: Multiple hash tables (incomplete_flow_tables, complete_flow_tables) track flow statistics over different short-term intervals (e.g., 5ms, 100ms, 1s) defined in intervals_user.c. + + Reference Table: A primary hash table (flow_ref_table) tracks flows over a longer, sliding time window. This table is used to identify the overall top flows across all intervals. + +### 2. Data Forwarding (tt_thread.c) + + Data Preparation: A separate thread (intervals_run) periodically wakes up to process the tracked flow data. It identifies the top flows from the reference table. + + Candidate Pool: The m2m function prepares a message for the frontend. Crucially, it creates a "candidate pool" of top flows. It sends up to MAX_FLOWS (defined as 20 in jt_msg_toptalk.h) of the highest-volume flows. This provides the frontend with more context than just the top 10. + + Message Queue: The prepared message is sent to the frontend via a message queue system (mq_tt_produce). + +## Frontend (Javascript) + +The frontend receives the candidate pool of flows and performs the final processing and visualization. + +### 1. Data Ingestion & Processing (jittertrap-core.js) + + Websocket: The frontend receives messages from the backend via a websocket connection (jittertrap-websocket.js). + + Core Processing: The processTopTalkMsg function in jittertrap-core.js is the main entry point for new data. It takes the incoming message containing up to 20 flows and updates its internal time-series data structures (flowsTS, flowRank, flowsTotals). flowRank is a sorted list of all flows received within the current time window, ranked by total bytes. + +### 2. Visualization and Aggregation (jittertrap-chart-toptalk.js) + + Final Summarization: This is the key to the new architecture. The chart's redraw function calls a helper, processAndAggregateChartData. + + This function takes the full list of flows prepared by jittertrap-core.js. + + It defines a LEGEND_DISPLAY_LIMIT (set to 10). + + It splits the incoming data: the first 10 flows are kept as individual series, and the rest (flows 11-20) are aggregated. + + It sums the byte counts of the remaining flows for each timestamp into a new, special data series with the key 'other'. + + Rendering: The final dataset, consisting of the top 10 individual flows plus the single "Other" aggregate flow, is passed to the D3.js charting library. The chart's existing logic recognizes the 'other' key and assigns it a unique color and a simple "Other Flows" label in the legend, displaying it as part of the stacked area chart. diff --git a/html5-client/src/js/jittertrap-chart-toptalk.js b/html5-client/src/js/jittertrap-chart-toptalk.js index 3a26b374..bee82fec 100644 --- a/html5-client/src/js/jittertrap-chart-toptalk.js +++ b/html5-client/src/js/jittertrap-chart-toptalk.js @@ -19,6 +19,42 @@ return chartData; }; + const processAndAggregateChartData = function(incomingData) { + const LEGEND_DISPLAY_LIMIT = 10; + + if (incomingData.length <= LEGEND_DISPLAY_LIMIT) { + return incomingData; + } + + const topNFlows = incomingData.slice(0, LEGEND_DISPLAY_LIMIT); + const remainingFlows = incomingData.slice(LEGEND_DISPLAY_LIMIT); + + const otherFlow = { + fkey: 'other', + tbytes: 0, + values: [] + }; + + const otherValuesMap = new Map(); + + remainingFlows.forEach(flow => { + otherFlow.tbytes += flow.tbytes; + flow.values.forEach(dataPoint => { + const currentBytes = otherValuesMap.get(dataPoint.ts) || 0; + otherValuesMap.set(dataPoint.ts, currentBytes + dataPoint.bytes); + }); + }); + + otherValuesMap.forEach((bytes, ts) => { + otherFlow.values.push({ ts: ts, bytes: bytes }); + }); + + // Ensure the values are sorted by timestamp, as d3 expects + otherFlow.values.sort((a, b) => a.ts - b.ts); + + return topNFlows.concat(otherFlow); + }; + my.charts.toptalk.toptalkChart = (function (m) { const margin = { top: 20, @@ -220,20 +256,30 @@ return { formattedData, maxSlice }; } + const getFlowColor = (key) => { + if (key === 'other') { + return '#cccccc'; // a neutral grey + } + return colorScale(key); + }; + /* Update the chart (try to avoid memory allocations here!) */ m.redraw = function() { + // Process the raw chartData to aggregate "other" flows before drawing + const processedChartData = processAndAggregateChartData(chartData); + const width = size.width - margin.left - margin.right; const height = size.height - margin.top - margin.bottom; xScale = d3.scaleLinear().range([0, width]); /* compute the domain of x as the [min,max] extent of timestamps * of the first (largest) flow */ - if (chartData && chartData[0]) - xScale.domain(d3.extent(chartData[0].values, d => d.ts)); + if (processedChartData && processedChartData[0]) + xScale.domain(d3.extent(processedChartData[0].values, d => d.ts)); - const { formattedData, maxSlice } = formatDataAndGetMaxSlice(chartData); + const { formattedData, maxSlice } = formatDataAndGetMaxSlice(processedChartData); const yPow = d3.select('input[name="y-axis-is-log"]:checked').node().value; @@ -257,7 +303,7 @@ svg.select(".xGrid").call(xGrid); svg.select(".yGrid").call(yGrid); - const fkeys = chartData.map(f => f.fkey); + const fkeys = processedChartData.map(f => f.fkey); colorScale.domain(fkeys); stack.keys(fkeys); @@ -277,15 +323,15 @@ stackedChartData.forEach(layer => { context.beginPath(); area(layer); - context.fillStyle = colorScale(layer.key); + context.fillStyle = getFlowColor(layer.key); context.fill(); }); // distribution bar - const tbytes = chartData.reduce((sum, f) => sum + f.tbytes, 0); + const tbytes = processedChartData.reduce((sum, f) => sum + f.tbytes, 0); let rangeStop = 0; - const barData = chartData.map(f => { + const barData = processedChartData.map(f => { const new_d = { k: f.fkey, x0: rangeStop, @@ -314,7 +360,7 @@ .attr("y", 9) .attr("x", d => x(d.x0)) .attr("width", d => x(d.x1) - x(d.x0)) - .style("fill", d => colorScale(d.k)); + .style("fill", d => getFlowColor(d.k)); barsbox.attr("transform", "translate(" + margin.left + "," + 350 + ")"); @@ -347,28 +393,32 @@ // Add the complex structure only ONCE when elements are created legendTextEnter.each(function(d) { - const parts = d.split('/'); - const sourceIP = parts[1]; - const sourcePort = parts[2]; - const destIP = parts[3]; - const destPort = parts[4]; - const proto = parts[5]; - const tclass = parts[6]; - const textNode = d3.select(this); - textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP); - textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6)); - textNode.append("tspan").attr("x", "30.5em").text("->"); - textNode.append("tspan").attr("x", "32.5em").text(destIP); - textNode.append("tspan").attr("x", "58em").text(":" + destPort); - textNode.append("tspan").attr("x", "63.5em").text("| " + proto); - textNode.append("tspan").attr("x", "70em").text("| " + tclass); + if (d === 'other') { + textNode.append("tspan").attr("x", 25).text("Other Flows"); + } else { + const parts = d.split('/'); + const sourceIP = parts[1]; + const sourcePort = parts[2]; + const destIP = parts[3]; + const destPort = parts[4]; + const proto = parts[5]; + const tclass = parts[6]; + + textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP); + textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6)); + textNode.append("tspan").attr("x", "30.5em").text("->"); + textNode.append("tspan").attr("x", "32.5em").text(destIP); + textNode.append("tspan").attr("x", "58em").text(":" + destPort); + textNode.append("tspan").attr("x", "63.5em").text("| " + proto); + textNode.append("tspan").attr("x", "70em").text("| " + tclass); + } }); // UPDATE + ENTER - update positions and colors for all visible items const legendUpdate = legend.merge(legendEnter); legendUpdate.attr("transform", (d, i) => "translate(0, " + ((i + 1) * 25) + ")"); - legendUpdate.select("rect").style("fill", colorScale); + legendUpdate.select("rect").style("fill", getFlowColor); }; diff --git a/html5-client/src/js/jittertrap-core.js b/html5-client/src/js/jittertrap-core.js index 8aeaea99..58df9269 100644 --- a/html5-client/src/js/jittertrap-core.js +++ b/html5-client/src/js/jittertrap-core.js @@ -212,8 +212,7 @@ const updateTopFlowChartData = function(interval) { const chartPeriod = my.charts.getChartPeriod(); const chartSeries = JT.charts.getTopFlowsRef(); - const fcount = (flowRank[interval].length < 10) ? - flowRank[interval].length : 10; + const fcount = flowRank[interval].length; updateSampleCounts(interval); diff --git a/messages/include/jt_msg_toptalk.h b/messages/include/jt_msg_toptalk.h index 34ae8733..3fee005e 100644 --- a/messages/include/jt_msg_toptalk.h +++ b/messages/include/jt_msg_toptalk.h @@ -7,9 +7,10 @@ int jt_toptalk_printer(void *data, char *out, int len); int jt_toptalk_free(void *data); const char *jt_toptalk_test_msg_get(void); +/* MAX_FLOWS should be 2x the number of displayed top N flows. */ #define MAX_FLOWS 20 #define ADDR_LEN 50 -#define PROTO_LEN 5 +#define PROTO_LEN 6 #define TCLASS_LEN 5 struct jt_msg_toptalk diff --git a/server/tt_thread.c b/server/tt_thread.c index 63cbf742..0fe368e1 100644 --- a/server/tt_thread.c +++ b/server/tt_thread.c @@ -152,7 +152,10 @@ static int m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval) struct jt_msg_toptalk *m = &msg->m; char s_addr_str[INET6_ADDRSTRLEN] = { 0 }; char d_addr_str[INET6_ADDRSTRLEN] = { 0 }; - int flow_count; + + // Determine the number of flows to send, capped by the message capacity. + // This relies on the compile-time MAX_FLOW_COUNT being >= MAX_FLOWS. + int flow_count = MIN(ttf->flow_count, MAX_FLOWS); m->timestamp.tv_sec = ttf->timestamp.tv_sec; m->timestamp.tv_nsec = ttf->timestamp.tv_usec * 1000; @@ -160,7 +163,6 @@ static int m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval) m->interval_ns = tt_intervals[interval].tv_sec * 1E9 + tt_intervals[interval].tv_usec * 1E3; - flow_count = MIN(ttf->flow_count, MAX_FLOWS); m->tflows = flow_count; m->tbytes = ttf->total_bytes; m->tpackets = ttf->total_packets; @@ -355,4 +357,3 @@ int intervals_thread_init(void) return 0; } -