Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/calicovppctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ require (
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/spdystream v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/onsi/ginkgo/v2 v2.22.2 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/testify v1.10.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions cmd/calicovppctl/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -37,6 +39,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand All @@ -52,13 +56,17 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU=
github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
Expand Down
176 changes: 163 additions & 13 deletions cmd/calicovppctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main
import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
Expand All @@ -39,7 +40,10 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -68,6 +72,9 @@ const (
// Kubernetes client timeout
kubeClientTimeout = 15 * time.Second

// Number of times capture file transfer is retried on failure
maxTransferRetries = 3

// Capture lock file path (inside VPP pod)
captureLockFile = "/tmp/calicovppctl.lock"
)
Expand All @@ -82,8 +89,9 @@ type CaptureLockInfo struct {
}

type KubeClient struct {
clientset *kubernetes.Clientset
timeout time.Duration
clientset *kubernetes.Clientset
restConfig *restclient.Config
timeout time.Duration
}

func newKubeClient() (*KubeClient, error) {
Expand All @@ -101,7 +109,7 @@ func newKubeClient() (*KubeClient, error) {
return nil, fmt.Errorf("failed to create Kubernetes client: %v", err)
}

return &KubeClient{clientset: clientset, timeout: kubeClientTimeout}, nil
return &KubeClient{clientset: clientset, restConfig: config, timeout: kubeClientTimeout}, nil
}

func (k *KubeClient) getAvailableNodeNames() ([]string, error) {
Expand Down Expand Up @@ -1262,6 +1270,71 @@ func (k *KubeClient) formatNodesWide(nodes []corev1.Node) string {
return output.String()
}

// streamFileFromPod streams a file from a pod to the provided writer using the Kubernetes Go client API
func (k *KubeClient) streamFileFromPod(namespace, podName, containerName, remoteFilePath string, writer io.Writer) error {
req := k.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: []string{"cat", remoteFilePath},
Stdin: false,
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(k.restConfig, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to create SPDY executor: %v", err)
}

var stderr bytes.Buffer
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdout: writer,
Stderr: &stderr,
})
if err != nil {
errMsg := strings.TrimSpace(stderr.String())
if errMsg != "" {
return fmt.Errorf("stream failed: %v: %s", err, errMsg)
}
return fmt.Errorf("stream failed: %v", err)
}

return nil
}

// cleanupPreExistingCaptureFile removes any pre-existing capture file before starting a new
// capture so that we do not append to or use stale data from a previous capture
func cleanupPreExistingCaptureFile(k *KubeClient, nodeName, remoteFile string) error {
podName, err := k.findNodePod(nodeName, defaultPod, defaultNamespace)
if err != nil {
return fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err)
}

// Check if the file exists
checkCmd := fmt.Sprintf("test -f %s", remoteFile)
_, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", checkCmd)
if err != nil {
// File does not exist, nothing to clean up
return nil
}

// File exists, remove it and its compressed version
remoteBasename := filepath.Base(remoteFile)
compressedFile := fmt.Sprintf("/tmp/%s.gz", remoteBasename)
cleanupCmd := fmt.Sprintf("rm -f %s %s", remoteFile, compressedFile)
_, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", cleanupCmd)
if err != nil {
return fmt.Errorf("failed to remove pre-existing capture file: %v", err)
}

printColored("grey", fmt.Sprintf("Cleaned up pre-existing capture file: %s", remoteFile))
return nil
}

func compressAndSaveRemoteFile(k *KubeClient, nodeName, remoteFile, localFile string) error {
namespace := defaultNamespace
container := defaultContainerVpp
Expand Down Expand Up @@ -1298,26 +1371,70 @@ func compressAndSaveRemoteFile(k *KubeClient, nodeName, remoteFile, localFile st
// Compress remote file
printColored("blue", "Compressing remote file...")
remoteBasename := filepath.Base(remoteFile)
compressCmd := fmt.Sprintf("gzip -c %s > /tmp/%s.gz", remoteFile, remoteBasename)
compressedRemoteFile := fmt.Sprintf("/tmp/%s.gz", remoteBasename)
compressCmd := fmt.Sprintf("gzip -c %s > %s", remoteFile, compressedRemoteFile)
_, err = k.execInPod(namespace, podName, container, "sh", "-c", compressCmd)
if err != nil {
return fmt.Errorf("failed to compress remote file: %v", err)
}

// Copy compressed file
// Stream compressed file to local storage using Kubernetes Go client API with retry logic
printColored("blue", "Copying compressed file...")

copyCmd := exec.Command(kubectlCmd, "cp",
fmt.Sprintf("%s/%s:/tmp/%s.gz", namespace, podName, remoteBasename),
localFile, "-c", container)
err = copyCmd.Run()
if err != nil {
return fmt.Errorf("failed to copy file: %v", err)
var transferErr error
for attempt := 1; attempt <= maxTransferRetries; attempt++ {
if attempt > 1 {
printColored("grey", fmt.Sprintf("Retry attempt %d/%d...", attempt, maxTransferRetries))
}

outFile, err := os.Create(localFile)
if err != nil {
transferErr = fmt.Errorf("failed to create local file: %v", err)
continue
}

err = k.streamFileFromPod(namespace, podName, container, compressedRemoteFile, outFile)
closeErr := outFile.Close()

if err != nil {
os.Remove(localFile)
transferErr = err
printColored("red", fmt.Sprintf("Transfer attempt %d failed: %v", attempt, err))
continue
}

if closeErr != nil {
os.Remove(localFile)
transferErr = fmt.Errorf("failed to write local file: %v", closeErr)
continue
}

// Transfer successful
transferErr = nil
break
}

if transferErr != nil {
// Transfer failed after all retries - print manual recovery instructions,
// but do not clean up remote files so that user can recover them manually
fmt.Println()
printColored("red", "File transfer failed after all retry attempts.")
printColored("red", fmt.Sprintf("Error: %v", transferErr))
fmt.Println()
printColored("blue", "The capture file is still available on the remote pod.")
printColored("blue", "You can manually retrieve it with:")
printColored("grey", fmt.Sprintf(" kubectl cp -n %s -c %s %s:%s %s",
namespace, container, podName, compressedRemoteFile, localFile))
fmt.Println()
printColored("blue", "To clean up remote files after manual retrieval, run:")
printColored("grey", fmt.Sprintf(" calicovppctl capture clear -node %s", nodeName))
fmt.Println()
return fmt.Errorf("file transfer failed: %v", transferErr)
}

// Clean up remote files
// Clean up remote files only on successful transfer
printColored("blue", "Cleaning up remote file...")
cleanupCmd := fmt.Sprintf("rm -f %s /tmp/%s.gz", remoteFile, remoteBasename)
cleanupCmd := fmt.Sprintf("rm -f %s %s", remoteFile, compressedRemoteFile)
_, err = k.execInPod(namespace, podName, container, "sh", "-c", cleanupCmd)
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to clean up remote files: %v", err))
Expand Down Expand Up @@ -1462,6 +1579,12 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf
return err
}

// Clean up any pre-existing capture file to ensure fresh capture
err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/trace.txt")
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err))
}

printColored("green", fmt.Sprintf("Starting packet trace on node '%s'", validatedNode))
printColored("grey", fmt.Sprintf("Packet count: %d", count))
printColored("grey", fmt.Sprintf("Timeout: %d seconds", timeout))
Expand Down Expand Up @@ -1629,6 +1752,12 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa
}
}()

// Clean up any pre-existing capture file to ensure fresh capture
err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/trace.pcap")
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err))
}

// First, let's validate that we can access the VPP interfaces
interfacesOutput, err := k.vppctl(validatedNode, "show", "interface")
if err != nil {
Expand Down Expand Up @@ -1832,6 +1961,12 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int
return err
}

// Clean up any pre-existing capture file to ensure fresh capture
err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/dispatch.pcap")
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err))
}

dispatchCmd := []string{
"pcap", "dispatch", "trace", "on",
"max", fmt.Sprintf("%d", count),
Expand Down Expand Up @@ -2261,6 +2396,21 @@ func captureCleanupCommand(k *KubeClient, nodeName string) error {
printColored("red", fmt.Sprintf("Warning: Failed to clear PCAP BPF filter: %v", err))
}

// Clean up remote capture files
printColored("blue", "Cleaning up remote capture files...")
podName, err := k.findNodePod(validatedNode, defaultPod, defaultNamespace)
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to find calico-vpp-node pod on node '%s': %v", validatedNode, err))
} else {
cleanupCmd := "rm -f /tmp/trace.txt /tmp/trace.txt.gz /tmp/trace.pcap /tmp/trace.pcap.gz /tmp/dispatch.pcap /tmp/dispatch.pcap.gz"
_, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", cleanupCmd)
if err != nil {
printColored("red", fmt.Sprintf("Warning: Failed to clean up remote files: %v", err))
} else {
printColored("green", "remote capture files cleaned up")
}
}

// Remove lock file
printColored("blue", "Removing lock file...")
err = removeCaptureLock(k, validatedNode)
Expand Down
Loading