From 4871b5b9cd64c3dba6c310b1d419f2cc3773acff Mon Sep 17 00:00:00 2001 From: Ashraf Fouda Date: Thu, 18 Dec 2025 16:44:21 +0200 Subject: [PATCH] make iperf requests against pub servers instead of our nodes Signed-off-by: Ashraf Fouda --- pkg/perf/iperf/iperf_task.go | 298 +++++++++++++++++++++--------- pkg/perf/iperf/iperf_task_test.go | 132 ++++++------- 2 files changed, 264 insertions(+), 166 deletions(-) diff --git a/pkg/perf/iperf/iperf_task.go b/pkg/perf/iperf/iperf_task.go index 06c2e682..fe6d8a9b 100644 --- a/pkg/perf/iperf/iperf_task.go +++ b/pkg/perf/iperf/iperf_task.go @@ -4,50 +4,90 @@ import ( "context" "encoding/json" "fmt" + "io" + "math/rand" "net" + "net/http" "os" "os/exec" "path/filepath" + "strconv" + "strings" "time" "github.com/cenkalti/backoff" "github.com/pkg/errors" "github.com/rs/zerolog/log" - "github.com/threefoldtech/zosbase/pkg/environment" - "github.com/threefoldtech/zosbase/pkg/network/iperf" "github.com/threefoldtech/zosbase/pkg/perf" - "github.com/threefoldtech/zosbase/pkg/perf/exec_wrapper" - "github.com/threefoldtech/zosbase/pkg/perf/graphql" + execwrapper "github.com/threefoldtech/zosbase/pkg/perf/exec_wrapper" ) const ( maxRetries = 3 - initialInterval = 5 * time.Minute - maxInterval = 20 * time.Minute - maxElapsedTime = time.Duration(maxRetries) * maxInterval - iperfTimeout = 30 * time.Second + initialInterval = 10 * time.Second + maxInterval = 90 * time.Second + maxElapsedTime = 7 * time.Minute + iperfTimeout = 90 * time.Second errServerBusy = "the server is busy running a test. try again later" + + iperf3ServersURL = "https://export.iperf3serverlist.net/listed_iperf3_servers.json" ) // IperfTest for iperf tcp/udp tests type IperfTest struct { // Optional dependencies for testing - graphqlClient GraphQLClient - execWrapper execwrapper.ExecWrapper + execWrapper execwrapper.ExecWrapper + httpClient *http.Client + serversURL string // for testing override + skipReachabilityCheck bool // for testing - skip server reachability check } // IperfResult for iperf test results type IperfResult struct { UploadSpeed float64 `json:"upload_speed"` // in bit/sec DownloadSpeed float64 `json:"download_speed"` // in bit/sec - NodeID uint32 `json:"node_id"` - NodeIpv4 string `json:"node_ip"` + ServerHost string `json:"server_host"` + ServerIP string `json:"server_ip"` + ServerPort int `json:"server_port"` TestType string `json:"test_type"` Error string `json:"error"` CpuReport CPUUtilizationPercent `json:"cpu_report"` } +// Iperf3Server represents a public iperf3 server from the list +type Iperf3Server struct { + Host string `json:"IP/HOST"` // IP or hostname + Port int `json:"-"` // Not directly unmarshaled + PortStr string `json:"PORT"` // Port comes as string in JSON +} + +// UnmarshalJSON custom unmarshaler to handle port as string or port range +func (s *Iperf3Server) UnmarshalJSON(data []byte) error { + type Alias Iperf3Server + aux := &struct { + *Alias + }{ + Alias: (*Alias)(s), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + // Convert port string to int, handling ranges like "9201-9240" + if s.PortStr != "" { + portStr := strings.Split(s.PortStr, "-")[0] // Take first port if range + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("invalid port value: %s", s.PortStr) + } + s.Port = port + } + + return nil +} + // NewTask creates a new iperf test func NewTask() perf.Task { // because go-iperf left tmp directories with perf binary in it each time @@ -71,7 +111,7 @@ func (t *IperfTest) Cron() string { // Description returns the task description func (t *IperfTest) Description() string { - return "Test public nodes network performance with both UDP and TCP over IPv4 and IPv6" + return "Test network performance against public iperf3 servers with both UDP and TCP" } // Jitter returns the max number of seconds the job can sleep before actual execution. @@ -81,95 +121,160 @@ func (t *IperfTest) Jitter() uint32 { // Run runs the tcp test and returns the result func (t *IperfTest) Run(ctx context.Context) (interface{}, error) { - var g GraphQLClient - var err error - - if t.graphqlClient != nil { - g = t.graphqlClient + // Check if iperf is available + if t.execWrapper != nil { + execWrap := t.execWrapper + _, err := execWrap.LookPath("iperf") + if err != nil { + return nil, errors.Wrap(err, "iperf not found") + } } else { - env := environment.MustGet() - graphqlClient, err := graphql.NewGraphQl(env.GraphQL...) + _, err := exec.LookPath("iperf") if err != nil { - return nil, err + return nil, errors.Wrap(err, "iperf not found") } - g = &graphqlClient } - // get public up nodes - freeFarmNodes, err := g.GetUpNodes(ctx, 0, 1, 0, true, true) + // Fetch a reachable public iperf3 server + server, err := t.fetchIperf3Server(ctx) if err != nil { - return nil, errors.Wrap(err, "failed to list freefarm nodes from graphql") + return nil, errors.Wrap(err, "failed to fetch public iperf3 server") } - nodes, err := g.GetUpNodes(ctx, 12, 0, 1, true, true) - if err != nil { - return nil, errors.Wrap(err, "failed to list random nodes from graphql") + if server == nil { + return nil, errors.New("no public iperf3 server available") } - nodes = append(nodes, freeFarmNodes...) + log.Info().Str("server-host", server.Host).Int("server-port", server.Port).Msg("using iperf3 server for testing") - if t.execWrapper != nil { - execWrap := t.execWrapper - _, err = execWrap.LookPath("iperf") - if err != nil { - return nil, err - } - } else { - _, err = exec.LookPath("iperf") - if err != nil { - return nil, err + var results []IperfResult + + // Run TCP test + res := t.runIperfTest(ctx, *server, true) + results = append(results, res) + + // Run UDP test + res = t.runIperfTest(ctx, *server, false) + results = append(results, res) + + return results, nil +} + +// fetchIperf3Server fetches the list of public iperf3 servers and finds the first reachable one +func (t *IperfTest) fetchIperf3Server(ctx context.Context) (*Iperf3Server, error) { + client := t.httpClient + if client == nil { + client = &http.Client{ + Timeout: 30 * time.Second, } } - var results []IperfResult - for _, node := range nodes { - clientIP, _, err := net.ParseCIDR(node.PublicConfig.Ipv4) - if err != nil { - log.Error().Err(err).Msg("failed to parse ipv4 address") - continue + url := t.serversURL + if url == "" { + url = iperf3ServersURL + } + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, errors.Wrap(err, "failed to create request") + } + + resp, err := client.Do(req) + if err != nil { + return nil, errors.Wrap(err, "failed to fetch iperf3 servers") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read response body") + } + + var servers []Iperf3Server + if err := json.Unmarshal(body, &servers); err != nil { + return nil, errors.Wrap(err, "failed to parse iperf3 servers list") + } + + log.Info().Int("count", len(servers)).Msg("fetched public iperf3 servers") + + // For testing, skip reachability check + if t.skipReachabilityCheck { + if len(servers) == 0 { + return nil, errors.New("no iperf3 servers available") } + return &servers[0], nil + } - clientIPv6, _, err := net.ParseCIDR(node.PublicConfig.Ipv6) - if err != nil { - log.Error().Err(err).Msg("failed to parse ipv6 address") - continue + // Find first reachable server by shuffling and checking + reachableServer := t.findFirstReachableServer(ctx, servers) + if reachableServer == nil { + return nil, errors.New("no reachable iperf3 servers found") + } + + log.Info().Str("host", reachableServer.Host).Int("port", reachableServer.Port).Msg("found reachable iperf3 server") + + return reachableServer, nil +} + +// findFirstReachableServer shuffles the server list and returns the first reachable one +func (t *IperfTest) findFirstReachableServer(ctx context.Context, servers []Iperf3Server) *Iperf3Server { + // Shuffle servers to randomize selection + shuffled := make([]Iperf3Server, len(servers)) + copy(shuffled, servers) + rand.Shuffle(len(shuffled), func(i, j int) { + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + }) + + // Find first reachable server + for _, server := range shuffled { + if t.isServerReachable(ctx, server) { + return &server } + log.Debug().Str("host", server.Host).Int("port", server.Port).Msg("iperf3 server unreachable, trying next") + } - // TCP - res := t.runIperfTest(ctx, clientIP.String(), true) - res.NodeID = node.NodeID - results = append(results, res) + return nil +} - res = t.runIperfTest(ctx, clientIPv6.String(), true) - res.NodeID = node.NodeID - results = append(results, res) +// isServerReachable checks if a server is reachable by attempting a TCP connection +func (t *IperfTest) isServerReachable(ctx context.Context, server Iperf3Server) bool { + // Skip servers with no host/IP or invalid port + if server.Host == "" || server.Port == 0 { + return false + } - // UDP - res = t.runIperfTest(ctx, clientIP.String(), false) - res.NodeID = node.NodeID - results = append(results, res) + address := fmt.Sprintf("%s:%d", server.Host, server.Port) - res = t.runIperfTest(ctx, clientIPv6.String(), false) - res.NodeID = node.NodeID - results = append(results, res) + // Use a short timeout for connectivity check + dialer := &net.Dialer{ + Timeout: 5 * time.Second, } - return results, nil + conn, err := dialer.DialContext(ctx, "tcp", address) + if err != nil { + return false + } + defer conn.Close() + + return true } -func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) IperfResult { +func (t *IperfTest) runIperfTest(ctx context.Context, server Iperf3Server, tcp bool) IperfResult { opts := make([]string, 0) + opts = append(opts, - "--client", clientIP, - "--port", fmt.Sprint(iperf.IperfPort), - "--interval", "20", - "--bandwidth", "0", // unlimited because udp limit is set to 1M by default - "-R", // doing the test in reverse gives more accurate results + "--client", server.Host, + "--port", fmt.Sprint(server.Port), + "--time", "10", // 10 second test duration "--json", ) if !tcp { - opts = append(opts, "--length", "16B", "--udp") + opts = append(opts, "--udp", "--bandwidth", "10M") // 10 Mbps for UDP } var execWrap execwrapper.ExecWrapper = &execwrapper.RealExecWrapper{} @@ -182,9 +287,9 @@ func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) timeoutCtx, cancel := context.WithTimeout(ctx, iperfTimeout) defer cancel() - res := runIperfCommand(timeoutCtx, opts, execWrap) - if res.Error == errServerBusy { - return errors.New(errServerBusy) + res := runIperf3Command(timeoutCtx, opts, execWrap) + if res.Error != "" { + return errors.New(res.Error) } report = res @@ -192,7 +297,7 @@ func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) } notify := func(err error, waitTime time.Duration) { - log.Debug().Err(err).Stringer("retry-in", waitTime).Msg("retrying") + log.Debug().Err(err).Stringer("retry-in", waitTime).Msg("retrying iperf3 test") } bo := backoff.NewExponentialBackOff() @@ -202,9 +307,6 @@ func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) b := backoff.WithMaxRetries(bo, maxRetries) err := backoff.RetryNotify(operation, b, notify) - if err != nil { - return IperfResult{} - } proto := "tcp" if !tcp { @@ -212,38 +314,52 @@ func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) } iperfResult := IperfResult{ - UploadSpeed: report.End.SumSent.BitsPerSecond, - DownloadSpeed: report.End.SumReceived.BitsPerSecond, - CpuReport: report.End.CPUUtilizationPercent, - NodeIpv4: clientIP, - TestType: proto, - Error: report.Error, + ServerHost: server.Host, + ServerIP: server.Host, + ServerPort: server.Port, + TestType: proto, } - if !tcp && len(report.End.Streams) > 0 { - iperfResult.DownloadSpeed = report.End.Streams[0].UDP.BitsPerSecond + + if err != nil { + log.Error().Err(err).Str("server", server.Host).Str("type", proto).Msg("iperf3 test failed") + iperfResult.Error = err.Error() + return iperfResult } + iperfResult.CpuReport = report.End.CPUUtilizationPercent + iperfResult.Error = report.Error + + // Both TCP and UDP use sum_sent and sum_received in the end section + iperfResult.UploadSpeed = report.End.SumSent.BitsPerSecond + iperfResult.DownloadSpeed = report.End.SumReceived.BitsPerSecond + + // Log if there's an error in the report + if report.Error != "" { + log.Warn().Str("server", server.Host).Str("type", proto).Str("iperf-error", report.Error).Msg("iperf3 test completed with error") + } + + log.Info().Str("server", server.Host).Str("type", proto).Float64("upload-mbps", iperfResult.UploadSpeed/1000000).Float64("download-mbps", iperfResult.DownloadSpeed/1000000).Msg("iperf3 test completed") + return iperfResult } -func runIperfCommand(ctx context.Context, opts []string, execWrap execwrapper.ExecWrapper) iperfCommandOutput { +func runIperf3Command(ctx context.Context, opts []string, execWrap execwrapper.ExecWrapper) iperfCommandOutput { output, err := execWrap.CommandContext(ctx, "iperf", opts...).CombinedOutput() exitErr := &exec.ExitError{} if err != nil { if ctx.Err() == context.DeadlineExceeded { - log.Warn().Msg("iperf command timed out for node with public IP: " + opts[1]) + log.Warn().Str("target", opts[1]).Msg("iperf3 command timed out") } if !errors.As(err, &exitErr) { - log.Err(err).Msg("failed to run iperf") + log.Error().Err(err).Msg("failed to run iperf3") } return iperfCommandOutput{} } - var report iperfCommandOutput if err := json.Unmarshal(output, &report); err != nil { - log.Err(err).Msg("failed to parse iperf output") + log.Error().Err(err).Str("output", string(output)).Msg("failed to parse iperf3 output") return iperfCommandOutput{} } diff --git a/pkg/perf/iperf/iperf_task_test.go b/pkg/perf/iperf/iperf_task_test.go index 1c44866e..3f9033a7 100644 --- a/pkg/perf/iperf/iperf_task_test.go +++ b/pkg/perf/iperf/iperf_task_test.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "errors" + "net/http" + "net/http/httptest" "testing" "github.com/stretchr/testify/assert" execwrapper "github.com/threefoldtech/zosbase/pkg/perf/exec_wrapper" - "github.com/threefoldtech/zosbase/pkg/perf/graphql" "go.uber.org/mock/gomock" ) @@ -16,32 +17,24 @@ func TestIperfTest_Run_Success(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockGraphQL := NewMockGraphQLClient(ctrl) mockExec := execwrapper.NewMockExecWrapper(ctrl) mockCmd := execwrapper.NewMockExecCmd(ctrl) - task := &IperfTest{ - graphqlClient: mockGraphQL, - execWrapper: mockExec, - } - - testNodes := []graphql.Node{ - { - NodeID: 123, - PublicConfig: graphql.PublicConfig{ - Ipv4: "192.168.1.100/24", - Ipv6: "2001:db8::1/64", - }, - }, - } + // Create mock HTTP server - need to create raw JSON with IP/HOST and PORT fields + serversJSON := []byte(`[{"IP/HOST":"192.168.1.100","PORT":"5201"}]`) - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 0, uint32(1), uint32(0), true, true). - Return([]graphql.Node{testNodes[0]}, nil) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write(serversJSON) + })) + defer server.Close() - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 12, uint32(0), uint32(1), true, true). - Return([]graphql.Node{}, nil) + task := &IperfTest{ + execWrapper: mockExec, + httpClient: server.Client(), + serversURL: server.URL, + skipReachabilityCheck: true, + } mockExec.EXPECT(). LookPath("iperf"). @@ -56,11 +49,9 @@ func TestIperfTest_Run_Success(t *testing.T) { mockExec.EXPECT(). CommandContext(gomock.Any(), "iperf", gomock.Any()). Return(mockCmd). - Times(4) + Times(2) mockCmd.EXPECT().CombinedOutput().Return(tcpOutputBytes, nil) - mockCmd.EXPECT().CombinedOutput().Return(tcpOutputBytes, nil) - mockCmd.EXPECT().CombinedOutput().Return(udpOutputBytes, nil) mockCmd.EXPECT().CombinedOutput().Return(udpOutputBytes, nil) ctx := context.Background() @@ -71,31 +62,39 @@ func TestIperfTest_Run_Success(t *testing.T) { results, ok := result.([]IperfResult) assert.True(t, ok) - assert.Len(t, results, 4) + assert.Len(t, results, 2) firstResult := results[0] - assert.Equal(t, uint32(123), firstResult.NodeID) - assert.Equal(t, "192.168.1.100", firstResult.NodeIpv4) + assert.Equal(t, "192.168.1.100", firstResult.ServerHost) + assert.Equal(t, "192.168.1.100", firstResult.ServerIP) + assert.Equal(t, 5201, firstResult.ServerPort) assert.Equal(t, "tcp", firstResult.TestType) assert.Equal(t, float64(1000000), firstResult.UploadSpeed) assert.Equal(t, float64(2000000), firstResult.DownloadSpeed) } -func TestIperfTest_Run_GraphQLError(t *testing.T) { +func TestIperfTest_Run_HTTPError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockGraphQL := NewMockGraphQLClient(ctrl) + mockExec := execwrapper.NewMockExecWrapper(ctrl) + + // Create mock HTTP server that returns error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() - // Create test task with injected dependencies task := &IperfTest{ - graphqlClient: mockGraphQL, + execWrapper: mockExec, + httpClient: server.Client(), + serversURL: server.URL, + skipReachabilityCheck: true, } - // Mock GraphQL error - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 0, uint32(1), uint32(0), true, true). - Return(nil, errors.New("graphql connection failed")) + mockExec.EXPECT(). + LookPath("iperf"). + Return("/usr/bin/iperf", nil) // Execute the test ctx := context.Background() @@ -103,30 +102,21 @@ func TestIperfTest_Run_GraphQLError(t *testing.T) { // Verify error assert.Error(t, err) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to fetch public iperf3 server") assert.Nil(t, result) - assert.Contains(t, err.Error(), "failed to list freefarm nodes from graphql") } -func TestIperfTest_Run_IperfNotFound(t *testing.T) { +func TestIperfTest_Run_Iperf3NotFound(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockGraphQL := NewMockGraphQLClient(ctrl) mockExec := execwrapper.NewMockExecWrapper(ctrl) task := &IperfTest{ - graphqlClient: mockGraphQL, - execWrapper: mockExec, + execWrapper: mockExec, } - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 0, uint32(1), uint32(0), true, true). - Return([]graphql.Node{}, nil) - - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 12, uint32(0), uint32(1), true, true). - Return([]graphql.Node{}, nil) - mockExec.EXPECT(). LookPath("iperf"). Return("", errors.New("executable file not found in $PATH")) @@ -135,39 +125,31 @@ func TestIperfTest_Run_IperfNotFound(t *testing.T) { result, err := task.Run(ctx) assert.Error(t, err) + assert.Error(t, err) + assert.Contains(t, err.Error(), "iperf not found") assert.Nil(t, result) } -func TestIperfTest_Run_InvalidIPAddress(t *testing.T) { +func TestIperfTest_Run_NoServersAvailable(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockGraphQL := NewMockGraphQLClient(ctrl) mockExec := execwrapper.NewMockExecWrapper(ctrl) - task := &IperfTest{ - graphqlClient: mockGraphQL, - execWrapper: mockExec, - } + // Create mock HTTP server that returns empty list + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("[]")) + })) + defer server.Close() - testNodes := []graphql.Node{ - { - NodeID: 123, - PublicConfig: graphql.PublicConfig{ - Ipv4: "invalid-ip", - Ipv6: "invalid-ipv6", - }, - }, + task := &IperfTest{ + execWrapper: mockExec, + httpClient: server.Client(), + serversURL: server.URL, + skipReachabilityCheck: true, } - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 0, uint32(1), uint32(0), true, true). - Return(testNodes, nil) - - mockGraphQL.EXPECT(). - GetUpNodes(gomock.Any(), 12, uint32(0), uint32(1), true, true). - Return([]graphql.Node{}, nil) - mockExec.EXPECT(). LookPath("iperf"). Return("/usr/bin/iperf", nil) @@ -175,10 +157,10 @@ func TestIperfTest_Run_InvalidIPAddress(t *testing.T) { ctx := context.Background() result, err := task.Run(ctx) - assert.NoError(t, err) - results, ok := result.([]IperfResult) - assert.True(t, ok) - assert.Len(t, results, 0) + assert.Error(t, err) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no iperf3 servers available") + assert.Nil(t, result) } func TestNewTask(t *testing.T) {