Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions deps/toptalk/intervals.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,28 +368,27 @@ static void tt_get_top5(struct tt_top_flows *t5, struct timeval deadline)
{
struct flow_hash *rfti; /* reference flow table iter */

/* sort the flow reference table */
/* sort the flow reference table by byte count */
HASH_SRT(r_hh, flow_ref_table, bytes_cmp);

/*
* expire old packets in the output path
* Expire old packets in the output path.
* NB: must be called in packet receive path as well.
*/
expire_old_packets(deadline);

/* check if the interval is complete and then rotate tables */
/* Check if the interval is complete and then rotate tables */
expire_old_interval_tables(deadline);

/* for each of the top 5 flow in the reference table,
* fill the counts from the short-interval flow tables */
/* For each of the top N flows in the reference table,
* fill the counts from the short-interval flow tables. */
rfti = flow_ref_table;

for (int i = 0; i < MAX_FLOW_COUNT && rfti; i++) {
fill_short_int_flows(t5->flow[i], rfti);
rfti = rfti->r_hh.next;
}
t5->flow_count = HASH_CNT(r_hh, flow_ref_table);

t5->flow_count = HASH_CNT(r_hh, flow_ref_table);
t5->total_bytes = rate_calc(ref_window_size, totals.bytes);
t5->total_packets = rate_calc(ref_window_size, totals.packets);
t5->timestamp = deadline;
Expand Down
61 changes: 61 additions & 0 deletions docs/top-talkers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Jittertrap Top Talkers Feature Overview

This document describes the architecture and data flow for the "Top Talkers" feature, which performs real-time network traffic analysis and visualization.

## Architecture

The system is a two-part application:

A C-based backend responsible for high-performance packet capture, flow tracking, and data aggregation.

A Javascript-based frontend responsible for data processing, final summarization, and interactive visualization.

This decoupled design allows the backend to focus on efficient, real-time processing while giving the frontend the flexibility to handle data presentation.

## Backend (C Language)

The backend runs as a set of high-priority threads to ensure timely packet capture and processing.

### 1. Packet Capture and Flow Tracking (intervals.c)

Capture: The pcap library is used to capture raw packets from a selected network interface in real-time. The capture process is managed within a dedicated thread (tt_intervals_run) to minimize packet loss.

Decoding: Each captured packet is passed through a series of decoders (decode.c) to parse Ethernet, IP (v4/v6), and Transport layer (TCP, UDP, etc.) headers. The relevant data (IPs, ports, protocol, byte count) is extracted into a flow_record struct.

Flow Tracking: The system uses hash tables (uthash.h) to track network flows.

Interval Tables: Multiple hash tables (incomplete_flow_tables, complete_flow_tables) track flow statistics over different short-term intervals (e.g., 5ms, 100ms, 1s) defined in intervals_user.c.

Reference Table: A primary hash table (flow_ref_table) tracks flows over a longer, sliding time window. This table is used to identify the overall top flows across all intervals.

### 2. Data Forwarding (tt_thread.c)

Data Preparation: A separate thread (intervals_run) periodically wakes up to process the tracked flow data. It identifies the top flows from the reference table.

Candidate Pool: The m2m function prepares a message for the frontend. Crucially, it creates a "candidate pool" of top flows. It sends up to MAX_FLOWS (defined as 20 in jt_msg_toptalk.h) of the highest-volume flows. This provides the frontend with more context than just the top 10.

Message Queue: The prepared message is sent to the frontend via a message queue system (mq_tt_produce).

## Frontend (Javascript)

The frontend receives the candidate pool of flows and performs the final processing and visualization.

### 1. Data Ingestion & Processing (jittertrap-core.js)

Websocket: The frontend receives messages from the backend via a websocket connection (jittertrap-websocket.js).

Core Processing: The processTopTalkMsg function in jittertrap-core.js is the main entry point for new data. It takes the incoming message containing up to 20 flows and updates its internal time-series data structures (flowsTS, flowRank, flowsTotals). flowRank is a sorted list of all flows received within the current time window, ranked by total bytes.

### 2. Visualization and Aggregation (jittertrap-chart-toptalk.js)

Final Summarization: This is the key to the new architecture. The chart's redraw function calls a helper, processAndAggregateChartData.

This function takes the full list of flows prepared by jittertrap-core.js.

It defines a LEGEND_DISPLAY_LIMIT (set to 10).

It splits the incoming data: the first 10 flows are kept as individual series, and the rest (flows 11-20) are aggregated.

It sums the byte counts of the remaining flows for each timestamp into a new, special data series with the key 'other'.

Rendering: The final dataset, consisting of the top 10 individual flows plus the single "Other" aggregate flow, is passed to the D3.js charting library. The chart's existing logic recognizes the 'other' key and assigns it a unique color and a simple "Other Flows" label in the legend, displaying it as part of the stacked area chart.
98 changes: 74 additions & 24 deletions html5-client/src/js/jittertrap-chart-toptalk.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,42 @@
return chartData;
};

const processAndAggregateChartData = function(incomingData) {
const LEGEND_DISPLAY_LIMIT = 10;
Copy link

Copilot AI Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 10 should be defined as a named constant at the module level to make it configurable and avoid duplication with the backend's expectations.

Suggested change
const LEGEND_DISPLAY_LIMIT = 10;

Copilot uses AI. Check for mistakes.

if (incomingData.length <= LEGEND_DISPLAY_LIMIT) {
return incomingData;
}

const topNFlows = incomingData.slice(0, LEGEND_DISPLAY_LIMIT);
const remainingFlows = incomingData.slice(LEGEND_DISPLAY_LIMIT);

const otherFlow = {
fkey: 'other',
Copy link

Copilot AI Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string 'other' is used as a magic value in multiple places (lines 34, 260, 397). Consider defining it as a named constant to prevent inconsistencies.

Suggested change
fkey: 'other',
fkey: OTHER_FLOW_KEY,

Copilot uses AI. Check for mistakes.
tbytes: 0,
values: []
};

const otherValuesMap = new Map();

remainingFlows.forEach(flow => {
otherFlow.tbytes += flow.tbytes;
flow.values.forEach(dataPoint => {
const currentBytes = otherValuesMap.get(dataPoint.ts) || 0;
otherValuesMap.set(dataPoint.ts, currentBytes + dataPoint.bytes);
});
});

otherValuesMap.forEach((bytes, ts) => {
otherFlow.values.push({ ts: ts, bytes: bytes });
});

// Ensure the values are sorted by timestamp, as d3 expects
otherFlow.values.sort((a, b) => a.ts - b.ts);

return topNFlows.concat(otherFlow);
};

my.charts.toptalk.toptalkChart = (function (m) {
const margin = {
top: 20,
Expand Down Expand Up @@ -220,20 +256,30 @@
return { formattedData, maxSlice };
}

const getFlowColor = (key) => {
if (key === 'other') {
return '#cccccc'; // a neutral grey
}
return colorScale(key);
};


/* Update the chart (try to avoid memory allocations here!) */
m.redraw = function() {

// Process the raw chartData to aggregate "other" flows before drawing
const processedChartData = processAndAggregateChartData(chartData);

const width = size.width - margin.left - margin.right;
const height = size.height - margin.top - margin.bottom;

xScale = d3.scaleLinear().range([0, width]);
/* compute the domain of x as the [min,max] extent of timestamps
* of the first (largest) flow */
if (chartData && chartData[0])
xScale.domain(d3.extent(chartData[0].values, d => d.ts));
if (processedChartData && processedChartData[0])
xScale.domain(d3.extent(processedChartData[0].values, d => d.ts));

const { formattedData, maxSlice } = formatDataAndGetMaxSlice(chartData);
const { formattedData, maxSlice } = formatDataAndGetMaxSlice(processedChartData);

const yPow = d3.select('input[name="y-axis-is-log"]:checked').node().value;

Expand All @@ -257,7 +303,7 @@
svg.select(".xGrid").call(xGrid);
svg.select(".yGrid").call(yGrid);

const fkeys = chartData.map(f => f.fkey);
const fkeys = processedChartData.map(f => f.fkey);
colorScale.domain(fkeys);

stack.keys(fkeys);
Expand All @@ -277,15 +323,15 @@
stackedChartData.forEach(layer => {
context.beginPath();
area(layer);
context.fillStyle = colorScale(layer.key);
context.fillStyle = getFlowColor(layer.key);
context.fill();
});

// distribution bar
const tbytes = chartData.reduce((sum, f) => sum + f.tbytes, 0);
const tbytes = processedChartData.reduce((sum, f) => sum + f.tbytes, 0);

let rangeStop = 0;
const barData = chartData.map(f => {
const barData = processedChartData.map(f => {
const new_d = {
k: f.fkey,
x0: rangeStop,
Expand Down Expand Up @@ -314,7 +360,7 @@
.attr("y", 9)
.attr("x", d => x(d.x0))
.attr("width", d => x(d.x1) - x(d.x0))
.style("fill", d => colorScale(d.k));
.style("fill", d => getFlowColor(d.k));

barsbox.attr("transform",
"translate(" + margin.left + "," + 350 + ")");
Expand Down Expand Up @@ -347,28 +393,32 @@

// Add the complex <tspan> structure only ONCE when elements are created
legendTextEnter.each(function(d) {
const parts = d.split('/');
const sourceIP = parts[1];
const sourcePort = parts[2];
const destIP = parts[3];
const destPort = parts[4];
const proto = parts[5];
const tclass = parts[6];

const textNode = d3.select(this);
textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP);
textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6));
textNode.append("tspan").attr("x", "30.5em").text("->");
textNode.append("tspan").attr("x", "32.5em").text(destIP);
textNode.append("tspan").attr("x", "58em").text(":" + destPort);
textNode.append("tspan").attr("x", "63.5em").text("| " + proto);
textNode.append("tspan").attr("x", "70em").text("| " + tclass);
if (d === 'other') {
textNode.append("tspan").attr("x", 25).text("Other Flows");
} else {
const parts = d.split('/');
const sourceIP = parts[1];
const sourcePort = parts[2];
const destIP = parts[3];
const destPort = parts[4];
const proto = parts[5];
const tclass = parts[6];

textNode.append("tspan").attr("x", "25em").attr("text-anchor", "end").text(sourceIP);
textNode.append("tspan").attr("x", "25.5em").text(":" + sourcePort.padEnd(6));
textNode.append("tspan").attr("x", "30.5em").text("->");
textNode.append("tspan").attr("x", "32.5em").text(destIP);
textNode.append("tspan").attr("x", "58em").text(":" + destPort);
textNode.append("tspan").attr("x", "63.5em").text("| " + proto);
textNode.append("tspan").attr("x", "70em").text("| " + tclass);
}
});

// UPDATE + ENTER - update positions and colors for all visible items
const legendUpdate = legend.merge(legendEnter);
legendUpdate.attr("transform", (d, i) => "translate(0, " + ((i + 1) * 25) + ")");
legendUpdate.select("rect").style("fill", colorScale);
legendUpdate.select("rect").style("fill", getFlowColor);
};


Expand Down
3 changes: 1 addition & 2 deletions html5-client/src/js/jittertrap-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@
const updateTopFlowChartData = function(interval) {
const chartPeriod = my.charts.getChartPeriod();
const chartSeries = JT.charts.getTopFlowsRef();
const fcount = (flowRank[interval].length < 10) ?
flowRank[interval].length : 10;
const fcount = flowRank[interval].length;

updateSampleCounts(interval);

Expand Down
3 changes: 2 additions & 1 deletion messages/include/jt_msg_toptalk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ int jt_toptalk_printer(void *data, char *out, int len);
int jt_toptalk_free(void *data);
const char *jt_toptalk_test_msg_get(void);

/* MAX_FLOWS should be 2x the number of displayed top N flows. */
#define MAX_FLOWS 20
#define ADDR_LEN 50
#define PROTO_LEN 5
#define PROTO_LEN 6
#define TCLASS_LEN 5

struct jt_msg_toptalk
Expand Down
7 changes: 4 additions & 3 deletions server/tt_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,17 @@ static int m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval)
struct jt_msg_toptalk *m = &msg->m;
char s_addr_str[INET6_ADDRSTRLEN] = { 0 };
char d_addr_str[INET6_ADDRSTRLEN] = { 0 };
int flow_count;

// Determine the number of flows to send, capped by the message capacity.
// This relies on the compile-time MAX_FLOW_COUNT being >= MAX_FLOWS.
int flow_count = MIN(ttf->flow_count, MAX_FLOWS);

m->timestamp.tv_sec = ttf->timestamp.tv_sec;
m->timestamp.tv_nsec = ttf->timestamp.tv_usec * 1000;

m->interval_ns = tt_intervals[interval].tv_sec * 1E9 +
tt_intervals[interval].tv_usec * 1E3;

flow_count = MIN(ttf->flow_count, MAX_FLOWS);
m->tflows = flow_count;
m->tbytes = ttf->total_bytes;
m->tpackets = ttf->total_packets;
Expand Down Expand Up @@ -355,4 +357,3 @@ int intervals_thread_init(void)

return 0;
}