diff --git a/cmd/calicovppctl/go.mod b/cmd/calicovppctl/go.mod index b4c742004..d3c70bbb4 100644 --- a/cmd/calicovppctl/go.mod +++ b/cmd/calicovppctl/go.mod @@ -29,12 +29,15 @@ require ( github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/moby/spdystream v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/onsi/ginkgo/v2 v2.22.2 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stretchr/testify v1.10.0 // indirect diff --git a/cmd/calicovppctl/go.sum b/cmd/calicovppctl/go.sum index 1ee3c1506..5a9337fe0 100644 --- a/cmd/calicovppctl/go.sum +++ b/cmd/calicovppctl/go.sum @@ -1,3 +1,5 @@ +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -37,6 +39,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -52,6 +56,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU= +github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -59,6 +65,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= diff --git a/cmd/calicovppctl/main.go b/cmd/calicovppctl/main.go index d633b0687..3010d75e4 100755 --- a/cmd/calicovppctl/main.go +++ b/cmd/calicovppctl/main.go @@ -18,6 +18,7 @@ package main import ( "archive/tar" "bufio" + "bytes" "compress/gzip" "context" "encoding/json" @@ -39,7 +40,10 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" "sigs.k8s.io/yaml" ) @@ -68,6 +72,9 @@ const ( // Kubernetes client timeout kubeClientTimeout = 15 * time.Second + // Number of times capture file transfer is retried on failure + maxTransferRetries = 3 + // Capture lock file path (inside VPP pod) captureLockFile = "/tmp/calicovppctl.lock" ) @@ -82,8 +89,9 @@ type CaptureLockInfo struct { } type KubeClient struct { - clientset *kubernetes.Clientset - timeout time.Duration + clientset *kubernetes.Clientset + restConfig *restclient.Config + timeout time.Duration } func newKubeClient() (*KubeClient, error) { @@ -101,7 +109,7 @@ func newKubeClient() (*KubeClient, error) { return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) } - return &KubeClient{clientset: clientset, timeout: kubeClientTimeout}, nil + return &KubeClient{clientset: clientset, restConfig: config, timeout: kubeClientTimeout}, nil } func (k *KubeClient) getAvailableNodeNames() ([]string, error) { @@ -1262,6 +1270,71 @@ func (k *KubeClient) formatNodesWide(nodes []corev1.Node) string { return output.String() } +// streamFileFromPod streams a file from a pod to the provided writer using the Kubernetes Go client API +func (k *KubeClient) streamFileFromPod(namespace, podName, containerName, remoteFilePath string, writer io.Writer) error { + req := k.clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: containerName, + Command: []string{"cat", remoteFilePath}, + Stdin: false, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(k.restConfig, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to create SPDY executor: %v", err) + } + + var stderr bytes.Buffer + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdout: writer, + Stderr: &stderr, + }) + if err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return fmt.Errorf("stream failed: %v: %s", err, errMsg) + } + return fmt.Errorf("stream failed: %v", err) + } + + return nil +} + +// cleanupPreExistingCaptureFile removes any pre-existing capture file before starting a new +// capture so that we do not append to or use stale data from a previous capture +func cleanupPreExistingCaptureFile(k *KubeClient, nodeName, remoteFile string) error { + podName, err := k.findNodePod(nodeName, defaultPod, defaultNamespace) + if err != nil { + return fmt.Errorf("could not find calico-vpp-node pod on node '%s': %v", nodeName, err) + } + + // Check if the file exists + checkCmd := fmt.Sprintf("test -f %s", remoteFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", checkCmd) + if err != nil { + // File does not exist, nothing to clean up + return nil + } + + // File exists, remove it and its compressed version + remoteBasename := filepath.Base(remoteFile) + compressedFile := fmt.Sprintf("/tmp/%s.gz", remoteBasename) + cleanupCmd := fmt.Sprintf("rm -f %s %s", remoteFile, compressedFile) + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", cleanupCmd) + if err != nil { + return fmt.Errorf("failed to remove pre-existing capture file: %v", err) + } + + printColored("grey", fmt.Sprintf("Cleaned up pre-existing capture file: %s", remoteFile)) + return nil +} + func compressAndSaveRemoteFile(k *KubeClient, nodeName, remoteFile, localFile string) error { namespace := defaultNamespace container := defaultContainerVpp @@ -1298,26 +1371,70 @@ func compressAndSaveRemoteFile(k *KubeClient, nodeName, remoteFile, localFile st // Compress remote file printColored("blue", "Compressing remote file...") remoteBasename := filepath.Base(remoteFile) - compressCmd := fmt.Sprintf("gzip -c %s > /tmp/%s.gz", remoteFile, remoteBasename) + compressedRemoteFile := fmt.Sprintf("/tmp/%s.gz", remoteBasename) + compressCmd := fmt.Sprintf("gzip -c %s > %s", remoteFile, compressedRemoteFile) _, err = k.execInPod(namespace, podName, container, "sh", "-c", compressCmd) if err != nil { return fmt.Errorf("failed to compress remote file: %v", err) } - // Copy compressed file + // Stream compressed file to local storage using Kubernetes Go client API with retry logic printColored("blue", "Copying compressed file...") - copyCmd := exec.Command(kubectlCmd, "cp", - fmt.Sprintf("%s/%s:/tmp/%s.gz", namespace, podName, remoteBasename), - localFile, "-c", container) - err = copyCmd.Run() - if err != nil { - return fmt.Errorf("failed to copy file: %v", err) + var transferErr error + for attempt := 1; attempt <= maxTransferRetries; attempt++ { + if attempt > 1 { + printColored("grey", fmt.Sprintf("Retry attempt %d/%d...", attempt, maxTransferRetries)) + } + + outFile, err := os.Create(localFile) + if err != nil { + transferErr = fmt.Errorf("failed to create local file: %v", err) + continue + } + + err = k.streamFileFromPod(namespace, podName, container, compressedRemoteFile, outFile) + closeErr := outFile.Close() + + if err != nil { + os.Remove(localFile) + transferErr = err + printColored("red", fmt.Sprintf("Transfer attempt %d failed: %v", attempt, err)) + continue + } + + if closeErr != nil { + os.Remove(localFile) + transferErr = fmt.Errorf("failed to write local file: %v", closeErr) + continue + } + + // Transfer successful + transferErr = nil + break + } + + if transferErr != nil { + // Transfer failed after all retries - print manual recovery instructions, + // but do not clean up remote files so that user can recover them manually + fmt.Println() + printColored("red", "File transfer failed after all retry attempts.") + printColored("red", fmt.Sprintf("Error: %v", transferErr)) + fmt.Println() + printColored("blue", "The capture file is still available on the remote pod.") + printColored("blue", "You can manually retrieve it with:") + printColored("grey", fmt.Sprintf(" kubectl cp -n %s -c %s %s:%s %s", + namespace, container, podName, compressedRemoteFile, localFile)) + fmt.Println() + printColored("blue", "To clean up remote files after manual retrieval, run:") + printColored("grey", fmt.Sprintf(" calicovppctl capture clear -node %s", nodeName)) + fmt.Println() + return fmt.Errorf("file transfer failed: %v", transferErr) } - // Clean up remote files + // Clean up remote files only on successful transfer printColored("blue", "Cleaning up remote file...") - cleanupCmd := fmt.Sprintf("rm -f %s /tmp/%s.gz", remoteFile, remoteBasename) + cleanupCmd := fmt.Sprintf("rm -f %s %s", remoteFile, compressedRemoteFile) _, err = k.execInPod(namespace, podName, container, "sh", "-c", cleanupCmd) if err != nil { printColored("red", fmt.Sprintf("Warning: Failed to clean up remote files: %v", err)) @@ -1462,6 +1579,12 @@ func traceCommand(k *KubeClient, nodeName string, count int, timeout int, interf return err } + // Clean up any pre-existing capture file to ensure fresh capture + err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/trace.txt") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err)) + } + printColored("green", fmt.Sprintf("Starting packet trace on node '%s'", validatedNode)) printColored("grey", fmt.Sprintf("Packet count: %d", count)) printColored("grey", fmt.Sprintf("Timeout: %d seconds", timeout)) @@ -1629,6 +1752,12 @@ func pcapCommand(k *KubeClient, nodeName string, count int, timeout int, interfa } }() + // Clean up any pre-existing capture file to ensure fresh capture + err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/trace.pcap") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err)) + } + // First, let's validate that we can access the VPP interfaces interfacesOutput, err := k.vppctl(validatedNode, "show", "interface") if err != nil { @@ -1832,6 +1961,12 @@ func dispatchCommand(k *KubeClient, nodeName string, count int, timeout int, int return err } + // Clean up any pre-existing capture file to ensure fresh capture + err = cleanupPreExistingCaptureFile(k, validatedNode, "/tmp/dispatch.pcap") + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clean up pre-existing capture file: %v", err)) + } + dispatchCmd := []string{ "pcap", "dispatch", "trace", "on", "max", fmt.Sprintf("%d", count), @@ -2261,6 +2396,21 @@ func captureCleanupCommand(k *KubeClient, nodeName string) error { printColored("red", fmt.Sprintf("Warning: Failed to clear PCAP BPF filter: %v", err)) } + // Clean up remote capture files + printColored("blue", "Cleaning up remote capture files...") + podName, err := k.findNodePod(validatedNode, defaultPod, defaultNamespace) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to find calico-vpp-node pod on node '%s': %v", validatedNode, err)) + } else { + cleanupCmd := "rm -f /tmp/trace.txt /tmp/trace.txt.gz /tmp/trace.pcap /tmp/trace.pcap.gz /tmp/dispatch.pcap /tmp/dispatch.pcap.gz" + _, err = k.execInPod(defaultNamespace, podName, defaultContainerVpp, "sh", "-c", cleanupCmd) + if err != nil { + printColored("red", fmt.Sprintf("Warning: Failed to clean up remote files: %v", err)) + } else { + printColored("green", "remote capture files cleaned up") + } + } + // Remove lock file printColored("blue", "Removing lock file...") err = removeCaptureLock(k, validatedNode)