From d67a7be2662074a09f58c099b2c804dfefa91282 Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Mon, 15 Dec 2025 20:45:02 -0500 Subject: [PATCH 1/2] Add BPF filtering support to calicovppctl trace/pcap/dispatch commands - Add CLI flags: -srcip, -dstip, -srcport, -dstport, -protocol - Implement BPF filter building and application using VPP CLI - Handle empty capture files gracefully - Support BPF filtering for trace, pcap, and dispatch commands Signed-off-by: Aritra Basu --- cmd/calicovppctl/main.go | 298 +++++++++++++++++++++++-- vpplink/generated/vpp_clone_current.sh | 4 + 2 files changed, 288 insertions(+), 14 deletions(-) diff --git a/cmd/calicovppctl/main.go b/cmd/calicovppctl/main.go index 372083a8..7a988626 100755 --- a/cmd/calicovppctl/main.go +++ b/cmd/calicovppctl/main.go @@ -21,9 +21,11 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "flag" "fmt" "io" + "net" "os" "os/exec" "os/signal" @@ -577,10 +579,13 @@ func printHelp() { fmt.Println("calicovppctl sh [-component vpp|agent] [-node NODENAME] - Get a shell in vpp (dataplane) or agent (controlplane) container") fmt.Println("calicovppctl trace [-node NODENAME] - Setup VPP packet tracing") fmt.Println(" Optional params: [-count N] [-timeout SEC] [-interface phy|af_xdp|af_packet|avf|vmxnet3|virtio|rdma|dpdk|memif|vcl]") + fmt.Println(" Filter params: [-srcip IP] [-dstip IP] [-srcport PORT] [-dstport PORT] [-protocol tcp|udp|icmp]") fmt.Println("calicovppctl pcap [-node NODENAME] - Setup VPP PCAP tracing") fmt.Println(" Optional params: [-count N] [-timeout SEC] [-interface INTERFACE_NAME|any(default)] [-output FILE.pcap]") + fmt.Println(" Filter params: [-srcip IP] [-dstip IP] [-srcport PORT] [-dstport PORT] [-protocol tcp|udp|icmp]") fmt.Println("calicovppctl dispatch [-node NODENAME] - Setup VPP dispatch tracing") fmt.Println(" Optional params: [-count N] [-timeout SEC] [-interface phy|af_xdp|af_packet|avf|vmxnet3|virtio|rdma|dpdk|memif|vcl] [-output FILE.pcap]") + fmt.Println(" Filter params: [-srcip IP] [-dstip IP] [-srcport PORT] [-dstport PORT] [-protocol tcp|udp|icmp]") fmt.Println() } @@ -595,6 +600,12 @@ func main() { timeout = flag.Int("timeout", 30, "Timeout in seconds for trace/pcap/dispatch commands (default: 30)") interfaceType = flag.String("interface", "", "interface types for trace/dispatch; interface names for pcap. See help for supported types") output = flag.String("output", "", "Output file for pcap/dispatch commands") + // BPF filter flags + srcIP = flag.String("srcip", "", "Source IP address filter (10.0.0.20, 2a04:baba::20, etc.)") + dstIP = flag.String("dstip", "", "Destination IP address filter (169.254.0.1, fe80:face::1, etc.)") + srcPort = flag.Int("srcport", 0, "Source port filter (68, 546, etc.)") + dstPort = flag.Int("dstport", 0, "Destination port filter (67, 547, 80, 443, etc.)") + protocol = flag.String("protocol", "", "Protocol filter (icmp, tcp, udp)") ) // Custom usage function @@ -659,6 +670,12 @@ func main() { interfacePtr := flagSet.String("interface", "", "Interface: types (memif,tuntap,vcl) for trace/dispatch; interface names for pcap") outputPtr := flagSet.String("output", "", "Output file for pcap/dispatch commands") helpPtr := flagSet.Bool("help", false, "Show help message") + // BPF filter flags + srcIPPtr := flagSet.String("srcip", "", "Source IP address filter") + dstIPPtr := flagSet.String("dstip", "", "Destination IP address filter") + srcPortPtr := flagSet.Int("srcport", 0, "Source port filter") + dstPortPtr := flagSet.Int("dstport", 0, "Destination port filter") + protocolPtr := flagSet.String("protocol", "", "Protocol filter (tcp, udp, icmp)") // Parse all remaining arguments for flags var finalCommandArgs []string @@ -705,6 +722,35 @@ func main() { *followPtr = true case "-help", "--help", "-h": *helpPtr = true + case "-srcip", "--srcip": + if i+1 < len(allArgs) { + *srcIPPtr = allArgs[i+1] + i++ + } + case "-dstip", "--dstip": + if i+1 < len(allArgs) { + *dstIPPtr = allArgs[i+1] + i++ + } + case "-srcport", "--srcport": + if i+1 < len(allArgs) { + if portVal, err := strconv.Atoi(allArgs[i+1]); err == nil { + *srcPortPtr = portVal + } + i++ + } + case "-dstport", "--dstport": + if i+1 < len(allArgs) { + if portVal, err := strconv.Atoi(allArgs[i+1]); err == nil { + *dstPortPtr = portVal + } + i++ + } + case "-protocol", "--protocol", "-proto": + if i+1 < len(allArgs) { + *protocolPtr = allArgs[i+1] + i++ + } } } else { // This is not a flag, add to final command args @@ -721,6 +767,11 @@ func main() { *interfaceType = *interfacePtr *output = *outputPtr *help = *helpPtr + *srcIP = *srcIPPtr + *dstIP = *dstIPPtr + *srcPort = *srcPortPtr + *dstPort = *dstPortPtr + *protocol = *protocolPtr // Show help if requested if *help { @@ -862,7 +913,7 @@ func main() { handleError(fmt.Errorf("node name is required for trace command. Use -node flag"), "") } - err := traceCommand(k, *nodeName, *count, *timeout, *interfaceType) + err := traceCommand(k, *nodeName, *count, *timeout, *interfaceType, *srcIP, *dstIP, *protocol, *srcPort, *dstPort) if err != nil { handleError(err, "Trace failed") } @@ -872,7 +923,7 @@ func main() { handleError(fmt.Errorf("node name is required for pcap command. Use -node flag"), "") } - err := pcapCommand(k, *nodeName, *count, *timeout, *interfaceType, *output) + err := pcapCommand(k, *nodeName, *count, *timeout, *interfaceType, *output, *srcIP, *dstIP, *protocol, *srcPort, *dstPort) if err != nil { handleError(err, "PCAP failed") } @@ -882,7 +933,7 @@ func main() { handleError(fmt.Errorf("node name is required for dispatch command. Use -node flag"), "") } - err := dispatchCommand(k, *nodeName, *count, *timeout, *interfaceType, *output) + err := dispatchCommand(k, *nodeName, *count, *timeout, *interfaceType, *output, *srcIP, *dstIP, *protocol, *srcPort, *dstPort) if err != nil { handleError(err, "Dispatch failed") } @@ -1189,6 +1240,23 @@ func compressAndSaveRemoteFile(k *KubeClient, nodeName, remoteFile, localFile st return fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err) } + // Check if remote file exists and has content; remove it if empty + checkCmd := fmt.Sprintf(`test -s %q || { rm -f %q; exit 3; }`, remoteFile, remoteFile) + _, err = k.execInPod(namespace, podName, container, "sh", "-c", checkCmd) + if err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) && exitErr.ExitCode() == 3 { + fmt.Println() + printColored("red", "No packets were captured with the specified filter.") + printColored("grey", "This could mean:") + printColored("grey", " - No matching traffic occurred during the capture period") + printColored("grey", " - The filter expression may be too restrictive") + printColored("grey", " - Try running without filters to verify if there is traffic") + return nil + } + return fmt.Errorf("failed to check remote file: %v", err) + } + printColored("green", fmt.Sprintf("Compressing and downloading file from node '%s'", nodeName)) printColored("grey", fmt.Sprintf("Pod: %s", podName)) printColored("grey", fmt.Sprintf("Remote file: %s", remoteFile)) @@ -1317,7 +1385,7 @@ func mapInterfaceTypeToVppInputNode(k *KubeClient, interfaceType string) (string } } -func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType string) error { +func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType, srcIP, dstIP, protocol string, srcPort, dstPort int) error { validatedNode, err := validateNodeName(k, nodeName) if err != nil { return err @@ -1333,6 +1401,27 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf printColored("grey", fmt.Sprintf("Timeout: %d seconds", timeout)) printColored("grey", fmt.Sprintf("VPP Input Node: %s", vppInputNode)) printColored("grey", "Output file: ./trace.txt.gz") + + // Build and apply BPF filter if specified + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := false + if bpfFilter != "" { + printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) + err := applyBPFFilter(k, validatedNode, bpfFilter, false) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to apply BPF filter: %v", err)) + printColored("grey", "Continuing without filter...") + } else { + useBPF = true + defer func() { + printColored("blue", "Clearing BPF filter...") + err := clearBPFFilter(k, validatedNode, false) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear BPF filter: %v", err)) + } + }() + } + } fmt.Println() // Clear any existing traces first @@ -1343,8 +1432,12 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf // Add trace for specified interface type printColored("blue", "Starting packet trace...") - printColored("grey", fmt.Sprintf("Command: trace add %s %d", vppInputNode, count)) - _, err = k.vppctl(validatedNode, "trace", "add", vppInputNode, fmt.Sprintf("%d", count)) + traceCmd := []string{"trace", "add", vppInputNode, fmt.Sprintf("%d", count)} + if useBPF { + traceCmd = append(traceCmd, "filter") + } + printColored("grey", fmt.Sprintf("Command: %s", strings.Join(traceCmd, " "))) + _, err = k.vppctl(validatedNode, traceCmd...) if err != nil { return fmt.Errorf("failed to add trace: %v", err) } @@ -1432,7 +1525,7 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf return nil } -func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType, outputFile string) error { +func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType, outputFile, srcIP, dstIP, protocol string, srcPort, dstPort int) error { validatedNode, err := validateNodeName(k, nodeName) if err != nil { return err @@ -1479,7 +1572,12 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa printColored("grey", "No interface specified, using 'any' to capture on all interfaces") } - pcapCommand := fmt.Sprintf("pcap trace tx rx max %d intfc %s file trace.pcap", count, interfaceFilter) + pcapCommand := []string{ + "pcap", "trace", "tx", "rx", + "max", fmt.Sprintf("%d", count), + "intfc", interfaceFilter, + "file", "trace.pcap", + } printColored("green", fmt.Sprintf("Starting PCAP trace on node '%s'", validatedNode)) printColored("grey", fmt.Sprintf("Packet count: %d", count)) @@ -1488,11 +1586,35 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa if outputFile != "" { printColored("grey", fmt.Sprintf("Output file: ./%s.gz", outputFile)) } + + // Build and apply BPF filter if specified + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := false + if bpfFilter != "" { + printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) + err := applyBPFFilter(k, validatedNode, bpfFilter, true) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to apply BPF filter: %v", err)) + printColored("grey", "Continuing without filter...") + } else { + useBPF = true + defer func() { + printColored("blue", "Clearing BPF filter...") + err := clearBPFFilter(k, validatedNode, true) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear BPF filter: %v", err)) + } + }() + } + } fmt.Println() printColored("blue", "Starting PCAP trace...") - printColored("grey", fmt.Sprintf("Command: %s", pcapCommand)) - _, err = k.vppctl(validatedNode, strings.Split(pcapCommand, " ")...) + if useBPF { + pcapCommand = append(pcapCommand, "filter") + } + printColored("grey", fmt.Sprintf("Command: %s", strings.Join(pcapCommand, " "))) + _, err = k.vppctl(validatedNode, pcapCommand...) if err != nil { return fmt.Errorf("failed to start PCAP trace: %v", err) } @@ -1569,7 +1691,7 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa return nil } -func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType, outputFile string) error { +func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, interfaceType, outputFile, srcIP, dstIP, protocol string, srcPort, dstPort int) error { validatedNode, err := validateNodeName(k, nodeName) if err != nil { return err @@ -1580,7 +1702,12 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int return err } - dispatchCommand := fmt.Sprintf("pcap dispatch trace on max %d buffer-trace %s %d", count, vppInputNode, count) + dispatchCmd := []string{ + "pcap", "dispatch", "trace", "on", + "max", fmt.Sprintf("%d", count), + "buffer-trace", vppInputNode, fmt.Sprintf("%d", count), + "file", "dispatch.pcap", + } printColored("green", fmt.Sprintf("Starting dispatch trace on node '%s'", validatedNode)) printColored("grey", fmt.Sprintf("Packet count: %d", count)) @@ -1589,11 +1716,35 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int if outputFile != "" { printColored("grey", fmt.Sprintf("Output file: ./%s.gz", outputFile)) } + + // Build and apply BPF filter if specified + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := false + if bpfFilter != "" { + printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) + err := applyBPFFilter(k, validatedNode, bpfFilter, true) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to apply BPF filter: %v", err)) + printColored("grey", "Continuing without filter...") + } else { + useBPF = true + defer func() { + printColored("blue", "Clearing BPF filter...") + err := clearBPFFilter(k, validatedNode, true) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear BPF filter: %v", err)) + } + }() + } + } fmt.Println() printColored("blue", "Starting dispatch trace...") - printColored("grey", fmt.Sprintf("Command: %s", dispatchCommand)) - _, err = k.vppctl(validatedNode, strings.Split(dispatchCommand, " ")...) + if useBPF { + dispatchCmd = append(dispatchCmd, "filter") + } + printColored("grey", fmt.Sprintf("Command: %s", strings.Join(dispatchCmd, " "))) + _, err = k.vppctl(validatedNode, dispatchCmd...) if err != nil { return fmt.Errorf("failed to start dispatch trace: %v", err) } @@ -1706,3 +1857,122 @@ func parseVppInterfaces(output string) []string { return upInterfaces } + +func validateIP(ip string) error { + if ip == "" { + return nil + } + if net.ParseIP(ip) == nil { + return fmt.Errorf("invalid IP address: %s", ip) + } + return nil +} + +// buildBPFFilter constructs a BPF filter expression from the provided parameters +func buildBPFFilter(srcIP, dstIP, protocol string, srcPort, dstPort int) string { + var filters []string + + // Add protocol filter + if protocol != "" { + switch strings.ToLower(protocol) { + case "tcp": + filters = append(filters, "tcp") + case "udp": + filters = append(filters, "udp") + case "icmp": + filters = append(filters, "icmp") + default: + printColored("red", fmt.Sprintf("Warning: Unknown protocol '%s', ignoring", protocol)) + } + } + + // Add IP filters + if srcIP != "" { + err := validateIP(srcIP) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Invalid source IP '%s', ignoring", srcIP)) + } else { + filters = append(filters, fmt.Sprintf("src host %s", srcIP)) + } + } + if dstIP != "" { + err := validateIP(dstIP) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Invalid destination IP '%s', ignoring", dstIP)) + } else { + filters = append(filters, fmt.Sprintf("dst host %s", dstIP)) + } + } + + // Add port filters + if srcPort != 0 { + if srcPort > 0 && srcPort < 65536 { + filters = append(filters, fmt.Sprintf("src port %d", srcPort)) + } else { + printColored("red", fmt.Sprintf("Warning: Invalid source port '%d', ignoring", srcPort)) + } + } + if dstPort != 0 { + if dstPort > 0 && dstPort < 65536 { + filters = append(filters, fmt.Sprintf("dst port %d", dstPort)) + } else { + printColored("red", fmt.Sprintf("Warning: Invalid destination port '%d', ignoring", dstPort)) + } + } + + if len(filters) == 0 { + return "" + } + + return strings.Join(filters, " and ") +} + +func applyBPFFilter(k *KubeClient, nodeName, bpfFilter string, isPcap bool) error { + printColored("blue", fmt.Sprintf("Applying BPF filter: %s", bpfFilter)) + + // Set the BPF filter expression ()set bpf trace filter {{}}) + filterArg := fmt.Sprintf("{{%s}}", bpfFilter) + out, err := k.vppctl(nodeName, "set", "bpf", "trace", "filter", filterArg) + if err != nil { + return fmt.Errorf("failed to set BPF filter: %v", err) + } + // Check if pcap_compile failed (VPP returns this in stdout, not as error) + if strings.Contains(out, "Failed pcap_compile") { + return fmt.Errorf("BPF filter compilation failed: %s", out) + } + + // Enable BPF filtering function (set trace/pcap filter function bpf_trace_filter) + if isPcap { + _, err = k.vppctl(nodeName, "set", "pcap", "filter", "function", "bpf_trace_filter") + } else { + _, err = k.vppctl(nodeName, "set", "trace", "filter", "function", "bpf_trace_filter") + } + if err != nil { + return fmt.Errorf("failed to enable BPF filter function: %v", err) + } + + printColored("green", "BPF filter applied successfully") + return nil +} + +// clearBPFFilter removes BPF filters from VPP +func clearBPFFilter(k *KubeClient, nodeName string, isPcap bool) error { + // Remove the BPF filter expression (set bpf trace filter del) + _, err := k.vppctl(nodeName, "set", "bpf", "trace", "filter", "del") + if err != nil { + return fmt.Errorf("failed to remove BPF filter: %v", err) + } + + // Reset to default filter function (set trace/pcap filter function vnet_is_packet_traced) + if isPcap { + _, err = k.vppctl(nodeName, "set", "pcap", "filter", "function", "vnet_is_packet_traced") + } else { + _, err = k.vppctl(nodeName, "set", "trace", "filter", "function", "vnet_is_packet_traced") + } + if err != nil { + return fmt.Errorf("failed to reset filter function: %v", err) + } + + printColored("green", "BPF filter cleared successfully") + return nil +} diff --git a/vpplink/generated/vpp_clone_current.sh b/vpplink/generated/vpp_clone_current.sh index ed74d3b1..23a4dcd3 100755 --- a/vpplink/generated/vpp_clone_current.sh +++ b/vpplink/generated/vpp_clone_current.sh @@ -120,6 +120,10 @@ git_cherry_pick refs/changes/89/41089/31 # https://gerrit.fd.io/r/c/vpp/+/41089 git_cherry_pick refs/changes/69/43369/16 # https://gerrit.fd.io/r/c/vpp/+/43369 cnat: converge new cnat implementation to support encaps (calico) +# bpf_trace_filter: add filter support for pcap dispatch trace and raw IP packet support +git_cherry_pick refs/changes/64/44464/9 # 44464: dispatch-trace: add filter support for pcap dispatch trace | https://gerrit.fd.io/r/c/vpp/+/44464 +git_cherry_pick refs/changes/67/44467/7 # 44467: bpf_trace_filter: add raw IP packet support | https://gerrit.fd.io/r/c/vpp/+/44467 + # --------------- private plugins --------------- # Generated with 'git format-patch --zero-commit -o ./patches/ HEAD^^^' git_apply_private 0001-pbl-Port-based-balancer.patch From 0ca6ae3fc9aa040768734397c90976c9a4291302 Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Wed, 7 Jan 2026 20:00:09 -0500 Subject: [PATCH 2/2] Add concurrency and cleanup enhancements to calicovppctl - Serialize capture operations (trace/pcap/dispatch) per VPP pod using an in-pod lock file (/tmp/calicovppctl.lock), preventing parallel captures from multiple clients - Provide clear error output when a capture is already running - Add forced cleanup option: `calicovppctl capture clear -node ` - clears trace - stops `pcap trace` and `pcap dispatch trace` - clears BPF filters and restores default filter functions - removes hanging in-pod lock file Signed-off-by: Aritra Basu --- cmd/calicovppctl/main.go | 319 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 308 insertions(+), 11 deletions(-) diff --git a/cmd/calicovppctl/main.go b/cmd/calicovppctl/main.go index 7a988626..d633b068 100755 --- a/cmd/calicovppctl/main.go +++ b/cmd/calicovppctl/main.go @@ -58,7 +58,6 @@ const ( bashCmd = "bash" vppctlPath = "/usr/bin/vppctl" vppSockPath = "/var/run/vpp/cli.sock" - sudoCmd = "sudo" dockerCmd = "docker" // Command templates @@ -68,8 +67,20 @@ const ( // Kubernetes client timeout kubeClientTimeout = 15 * time.Second + + // Capture lock file path (inside VPP pod) + captureLockFile = "/tmp/calicovppctl.lock" ) +// CaptureLockInfo represents the information stored in the lock file +type CaptureLockInfo struct { + NodeName string `json:"node_name"` + Operation string `json:"operation"` + StartedAt time.Time `json:"started_at"` + Hostname string `json:"hostname"` + BPFActive bool `json:"bpf_active"` +} + type KubeClient struct { clientset *kubernetes.Clientset timeout time.Duration @@ -573,6 +584,7 @@ func printHelp() { fmt.Println("calicovppctl vppctl [-node NODENAME] [VPP_COMMANDS...] - Get a vppctl shell or run VPP commands on a specific node") fmt.Println("calicovppctl log [-f] [-component vpp|agent|felix] [-node NODENAME] - Get the logs of vpp (dataplane) or agent (controlplane) or felix daemon") fmt.Println("calicovppctl clear - Clear vpp internal stats") + fmt.Println("calicovppctl capture clear [-node NODENAME] - Clear all active captures and BPF filters (forced cleanup)") fmt.Println("calicovppctl export - Create an archive with vpp & k8 system state for debugging") fmt.Println("calicovppctl exportnode [-node NODENAME] - Create an archive with vpp & k8 system state for a specific node") fmt.Println("calicovppctl gdb - Attach gdb to the running vpp on the current machine") @@ -628,7 +640,7 @@ func main() { // Check if this is a known command if !commandFound && !strings.HasPrefix(arg, "-") { switch arg { - case "vppctl", "log", "clear", "export", "exportnode", "gdb", "sh", "trace", "pcap", "dispatch": + case "vppctl", "log", "clear", "export", "exportnode", "gdb", "sh", "trace", "pcap", "dispatch", "capture": command = arg commandFound = true commandArgs = args[i+1:] @@ -938,6 +950,26 @@ func main() { handleError(err, "Dispatch failed") } + case "capture": + if len(commandArgs) == 0 { + handleError(fmt.Errorf("capture command requires a subcommand. Use 'capture clear'"), "") + } + + switch commandArgs[0] { + case "clear": + if *nodeName == "" { + handleError(fmt.Errorf("node name is required for capture clear command. Use -node flag"), "") + } + + err := captureCleanupCommand(k, *nodeName) + if err != nil { + handleError(err, "Capture cleanup failed") + } + + default: + handleError(fmt.Errorf("unknown capture subcommand: %s. Use 'capture clear'", commandArgs[0]), "") + } + default: fmt.Printf("Unknown command: %s\n\n", command) printHelp() @@ -1391,6 +1423,40 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf return err } + // Check for existing capture locks + lockInfo, err := checkCaptureLock(k, validatedNode) + if err != nil { + return fmt.Errorf("failed to check capture lock: %v", err) + } + if lockInfo != nil { + return fmt.Errorf("capture operation already running on node '%s'\n"+ + " Operation: %s\n"+ + " Started by: %s\n"+ + " Started at: %s\n"+ + " BPF filters active: %t\n\n"+ + "Use 'calicovppctl capture clear -node %s' to force cleanup if the previous operation failed", + validatedNode, lockInfo.Operation, lockInfo.Hostname, + lockInfo.StartedAt.Format("2006-01-02 15:04:05"), lockInfo.BPFActive, validatedNode) + } + + // Check if BPF filters will be used + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := bpfFilter != "" + + // Create capture lock + err = createCaptureLock(k, validatedNode, "trace", useBPF) + if err != nil { + return fmt.Errorf("failed to create capture lock: %v", err) + } + + // Ensure cleanup on exit + defer func() { + err := removeCaptureLock(k, validatedNode) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to remove capture lock: %v", err)) + } + }() + vppInputNode, _, err := mapInterfaceTypeToVppInputNode(k, interfaceType) if err != nil { return err @@ -1402,9 +1468,7 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf printColored("grey", fmt.Sprintf("VPP Input Node: %s", vppInputNode)) printColored("grey", "Output file: ./trace.txt.gz") - // Build and apply BPF filter if specified - bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) - useBPF := false + // Apply BPF filter if specified if bpfFilter != "" { printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) err := applyBPFFilter(k, validatedNode, bpfFilter, false) @@ -1531,6 +1595,40 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa return err } + // Check for existing capture locks + lockInfo, err := checkCaptureLock(k, validatedNode) + if err != nil { + return fmt.Errorf("failed to check capture lock: %v", err) + } + if lockInfo != nil { + return fmt.Errorf("capture operation already running on node '%s'\n"+ + " Operation: %s\n"+ + " Started by: %s\n"+ + " Started at: %s\n"+ + " BPF filters active: %t\n\n"+ + "Use 'calicovppctl capture clear -node %s' to force cleanup if the previous operation failed", + validatedNode, lockInfo.Operation, lockInfo.Hostname, + lockInfo.StartedAt.Format("2006-01-02 15:04:05"), lockInfo.BPFActive, validatedNode) + } + + // Check if BPF filters will be used + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := bpfFilter != "" + + // Create capture lock + err = createCaptureLock(k, validatedNode, "pcap", useBPF) + if err != nil { + return fmt.Errorf("failed to create capture lock: %v", err) + } + + // Ensure cleanup on exit + defer func() { + err := removeCaptureLock(k, validatedNode) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to remove capture lock: %v", err)) + } + }() + // First, let's validate that we can access the VPP interfaces interfacesOutput, err := k.vppctl(validatedNode, "show", "interface") if err != nil { @@ -1587,9 +1685,7 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa printColored("grey", fmt.Sprintf("Output file: ./%s.gz", outputFile)) } - // Build and apply BPF filter if specified - bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) - useBPF := false + // Apply BPF filter if specified if bpfFilter != "" { printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) err := applyBPFFilter(k, validatedNode, bpfFilter, true) @@ -1697,6 +1793,40 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int return err } + // Check for existing capture locks + lockInfo, err := checkCaptureLock(k, validatedNode) + if err != nil { + return fmt.Errorf("failed to check capture lock: %v", err) + } + if lockInfo != nil { + return fmt.Errorf("capture operation already running on node '%s'\n"+ + " Operation: %s\n"+ + " Started by: %s\n"+ + " Started at: %s\n"+ + " BPF filters active: %t\n\n"+ + "Use 'calicovppctl capture clear -node %s' to force cleanup if the previous operation failed", + validatedNode, lockInfo.Operation, lockInfo.Hostname, + lockInfo.StartedAt.Format("2006-01-02 15:04:05"), lockInfo.BPFActive, validatedNode) + } + + // Check if BPF filters will be used + bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) + useBPF := bpfFilter != "" + + // Create capture lock + err = createCaptureLock(k, validatedNode, "dispatch", useBPF) + if err != nil { + return fmt.Errorf("failed to create capture lock: %v", err) + } + + // Ensure cleanup on exit + defer func() { + err := removeCaptureLock(k, validatedNode) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to remove capture lock: %v", err)) + } + }() + vppInputNode, _, err := mapInterfaceTypeToVppInputNode(k, interfaceType) if err != nil { return err @@ -1717,9 +1847,7 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int printColored("grey", fmt.Sprintf("Output file: ./%s.gz", outputFile)) } - // Build and apply BPF filter if specified - bpfFilter := buildBPFFilter(srcIP, dstIP, protocol, srcPort, dstPort) - useBPF := false + // Apply BPF filter if specified if bpfFilter != "" { printColored("grey", fmt.Sprintf("BPF Filter: %s", bpfFilter)) err := applyBPFFilter(k, validatedNode, bpfFilter, true) @@ -1976,3 +2104,172 @@ func clearBPFFilter(k *KubeClient, nodeName string, isPcap bool) error { printColored("green", "BPF filter cleared successfully") return nil } + +// createCaptureLock creates a lock file for the specified node and operation +func createCaptureLock(k *KubeClient, nodeName, operation string, bpfActive bool) error { + // Get hostname + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to get hostname: %v", err) + } + + lockInfo := CaptureLockInfo{ + NodeName: nodeName, + Operation: operation, + StartedAt: time.Now(), + Hostname: hostname, + BPFActive: bpfActive, + } + + lockData, err := json.MarshalIndent(lockInfo, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal lock info: %v", err) + } + + // Find the pod on the specified node + podName, err := k.findNodePod(nodeName, defaultPod, defaultNamespace) + if err != nil { + return fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err) + } + + // Create lock file inside VPP pod using kubectl exec + createCmd := fmt.Sprintf("cat > %s", captureLockFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", createCmd) + if err != nil { + return fmt.Errorf("failed to create lock file in VPP pod: %v", err) + } + + // Write the lock data to the file via stdin + cmd := exec.Command(kubectlCmd, "exec", "-i", "-n", defaultNamespace, + "-c", defaultContainerVpp, podName, "--", "sh", "-c", createCmd) + cmd.Stdin = strings.NewReader(string(lockData)) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to write lock file data: %v, output: %s", err, string(output)) + } + + return nil +} + +// checkCaptureLock checks if there's an active capture lock and returns conflict info +func checkCaptureLock(k *KubeClient, nodeName string) (*CaptureLockInfo, error) { + // Find the pod on the specified node + podName, err := k.findNodePod(nodeName, defaultPod, defaultNamespace) + if err != nil { + return nil, fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err) + } + + // Check if lock file exists in VPP pod + checkCmd := fmt.Sprintf("test -f %s", captureLockFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", checkCmd) + if err != nil { + // Lock file doesn't exist + return nil, nil + } + + // Read lock file from VPP pod + readCmd := fmt.Sprintf("cat %s", captureLockFile) + lockData, err := k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", readCmd) + if err != nil { + return nil, fmt.Errorf("failed to read lock file from VPP pod: %v", err) + } + + var lockInfo CaptureLockInfo + err = json.Unmarshal([]byte(lockData), &lockInfo) + if err != nil { + return nil, fmt.Errorf("failed to parse lock file: %v", err) + } + + // Check if the lock is for the same node + if lockInfo.NodeName == nodeName { + return &lockInfo, nil + } + + return nil, nil // Lock exists but for different node (should not happen) +} + +// removeCaptureLock removes the capture lock file +func removeCaptureLock(k *KubeClient, nodeName string) error { + // Find the pod on the specified node + podName, err := k.findNodePod(nodeName, defaultPod, defaultNamespace) + if err != nil { + return fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err) + } + + // Check if lock file exists in VPP pod + checkCmd := fmt.Sprintf("test -f %s", captureLockFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", checkCmd) + if err != nil { + // Lock file doesn't exist, nothing to remove + return nil + } + + // Remove lock file from VPP pod + removeCmd := fmt.Sprintf("rm -f %s", captureLockFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", removeCmd) + if err != nil { + return fmt.Errorf("failed to remove lock file from VPP pod: %v", err) + } + + return nil +} + +// captureCleanupCommand performs forced cleanup of all capture operations +func captureCleanupCommand(k *KubeClient, nodeName string) error { + validatedNode, err := validateNodeName(k, nodeName) + if err != nil { + return err + } + + printColored("blue", fmt.Sprintf("Starting cleanup on node '%s'", validatedNode)) + + // Stop all active traces + printColored("blue", "Clearing traces...") + _, err = k.vppctl(validatedNode, "clear", "trace") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear trace: %v", err)) + } else { + printColored("green", "trace cleared") + } + + // Stop PCAP captures + printColored("blue", "Stopping PCAP captures...") + _, err = k.vppctl(validatedNode, "pcap", "trace", "off") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to stop PCAP trace: %v", err)) + } else { + printColored("green", "PCAP trace stopped") + } + + // Stop dispatch captures + printColored("blue", "Stopping dispatch captures...") + _, err = k.vppctl(validatedNode, "pcap", "dispatch", "trace", "off") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to stop dispatch trace: %v", err)) + } else { + printColored("green", "dispatch trace stopped") + } + + // Clear BPF filters for both trace and pcap + printColored("blue", "Clearing BPF filters...") + err = clearBPFFilter(k, validatedNode, false) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear trace BPF filter: %v", err)) + } + err = clearBPFFilter(k, validatedNode, true) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clear PCAP BPF filter: %v", err)) + } + + // Remove lock file + printColored("blue", "Removing lock file...") + err = removeCaptureLock(k, validatedNode) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to remove lock file: %v", err)) + } else { + printColored("green", "lock file removed") + } + + printColored("green", "Cleanup completed") + return nil +}