diff --git a/internal/experiment/riseupvpn/riseupvpn.go b/internal/experiment/riseupvpn/riseupvpn.go index 37061826cd..ffc95bcd5a 100644 --- a/internal/experiment/riseupvpn/riseupvpn.go +++ b/internal/experiment/riseupvpn/riseupvpn.go @@ -7,9 +7,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" + "github.com/ooni/probe-cli/v3/internal/measurex" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/ooni/probe-cli/v3/internal/tracex" @@ -36,6 +38,7 @@ type GatewayV3 struct { } Host string IPAddress string `json:"ip_address"` + Location string `json:"location"` } // TransportV3 describes a transport. @@ -43,7 +46,12 @@ type TransportV3 struct { Type string Protocols []string Ports []string - Options map[string]string + Options OptionsV3 +} + +type OptionsV3 struct { + Cert string `json:"cert"` + IatMode string `json:"iatMode"` } // GatewayConnection describes the connection to a riseupvpn gateway. @@ -53,19 +61,47 @@ type GatewayConnection struct { TransportType string `json:"transport_type"` } +// GatewayLoad describes the load of a single Gateway. +type GatewayLoad struct { + Host string `json:"host"` + Fullness float64 `json:"fullness"` + Overload bool `json:"overload"` +} + +// GeoService represents the geoService API (also known as menshen) json response +type GeoService struct { + IPAddress string `json:"ip"` + Country string `json:"cc"` + City string `json:"city"` + Latitude float64 `json:"lat"` + Longitude float64 `json:"lon"` + Gateways []string `json:"gateways"` + SortedGateways []GatewayLoad `json:"sortedGateways"` +} + // Config contains the riseupvpn experiment config. type Config struct { urlgetter.Config } +// TargetResults contains the results of measuring a target. +type Obfs4TargetResults struct { + Failure *string `json:"failure"` + NetworkEvents []*measurex.ArchivalNetworkEvent `json:"network_events"` + TargetAddress string `json:"target_address"` + TCPConnect []*measurex.ArchivalTCPConnect `json:"tcp_connect"` + TLSHandshakes []*measurex.ArchivalQUICTLSHandshakeEvent `json:"tls_handshakes"` +} + // TestKeys contains riseupvpn test keys. type TestKeys struct { urlgetter.TestKeys - APIFailure *string `json:"api_failure"` - APIStatus string `json:"api_status"` - CACertStatus bool `json:"ca_cert_status"` - FailingGateways []GatewayConnection `json:"failing_gateways"` - TransportStatus map[string]string `json:"transport_status"` + Obfs4HandshakeResults []Obfs4TargetResults `json:"obfs4_handshake_results"` + APIFailure []string `json:"api_failure"` + APIStatus string `json:"api_status"` + CACertStatus bool `json:"ca_cert_status"` + FailingGateways []GatewayConnection `json:"failing_gateways"` + TransportStatus map[string]string `json:"transport_status"` } // NewTestKeys creates new riseupvpn TestKeys. @@ -86,12 +122,9 @@ func (tk *TestKeys) UpdateProviderAPITestKeys(v urlgetter.MultiOutput) { tk.Requests = append(tk.Requests, v.TestKeys.Requests...) tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.TCPConnect...) tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.TLSHandshakes...) - if tk.APIStatus != "ok" { - return // we already flipped the state - } if v.TestKeys.Failure != nil { tk.APIStatus = "blocked" - tk.APIFailure = v.TestKeys.Failure + tk.APIFailure = append(tk.APIFailure, *v.TestKeys.Failure) return } } @@ -110,6 +143,28 @@ func (tk *TestKeys) AddGatewayConnectTestKeys(v urlgetter.MultiOutput, transport } } +// AddGatewayConnectTestKeys updates the TestKeys using the given MultiOutput +// result of gateway connectivity testing. Sets TransportStatus to "ok" if +// any successful TCP connection could be made +func (tk *TestKeys) AddGatewayObfs4HandshakeTestKeys(targetAddress string, measurement measurex.ArchivalMeasurement, failure *string) { + obfs4TargetResult := Obfs4TargetResults{ + Failure: failure, + NetworkEvents: measurement.NetworkEvents, + TCPConnect: measurement.TCPConnect, + TLSHandshakes: measurement.TLSHandshakes, + TargetAddress: targetAddress, + } + + // TODO: check if failure != nil + tk.Obfs4HandshakeResults = append(tk.Obfs4HandshakeResults, obfs4TargetResult) + for _, tcpConnect := range obfs4TargetResult.TCPConnect { + if !tcpConnect.Status.Success { + gatewayConnection := newObfs4GatewayConnection(*tcpConnect) + tk.FailingGateways = append(tk.FailingGateways, *gatewayConnection) + } + } +} + func (tk *TestKeys) updateTransportStatus(openvpnGatewayCount, obfs4GatewayCount int) { failingOpenvpnGateways, failingObfs4Gateways := 0, 0 for _, gw := range tk.FailingGateways { @@ -140,6 +195,15 @@ func newGatewayConnection( } } +func newObfs4GatewayConnection( + tcpConnect measurex.ArchivalTCPConnect) *GatewayConnection { + return &GatewayConnection{ + IP: tcpConnect.IP, + Port: int(tcpConnect.Port), + TransportType: "obfs4", + } +} + // AddCACertFetchTestKeys adds generic urlgetter.Get() testKeys to riseupvpn specific test keys func (tk *TestKeys) AddCACertFetchTestKeys(testKeys urlgetter.TestKeys) { tk.NetworkEvents = append(tk.NetworkEvents, testKeys.NetworkEvents...) @@ -147,11 +211,6 @@ func (tk *TestKeys) AddCACertFetchTestKeys(testKeys urlgetter.TestKeys) { tk.Requests = append(tk.Requests, testKeys.Requests...) tk.TCPConnect = append(tk.TCPConnect, testKeys.TCPConnect...) tk.TLSHandshakes = append(tk.TLSHandshakes, testKeys.TLSHandshakes...) - if testKeys.Failure != nil { - tk.APIStatus = "blocked" - tk.APIFailure = tk.Failure - tk.CACertStatus = false - } } // Measurer performs the measurement. @@ -204,21 +263,19 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { FailOnHTTPError: true, }}, } - for entry := range multi.CollectOverall(ctx, inputs, 0, 50, "riseupvpn", callbacks) { + for entry := range multi.CollectOverall(ctx, inputs, 0, 20, "riseupvpn", callbacks) { tk := entry.TestKeys testkeys.AddCACertFetchTestKeys(tk) if tk.Failure != nil { - // TODO(bassosimone,cyberta): should we update the testkeys - // in this case (e.g., APIFailure?) - // See https://github.com/ooni/probe/issues/1432. - return nil - } - if ok := certPool.AppendCertsFromPEM([]byte(tk.HTTPResponseBody)); !ok { testkeys.CACertStatus = false testkeys.APIStatus = "blocked" - errorValue := "invalid_ca" - testkeys.APIFailure = &errorValue - return nil + testkeys.APIFailure = append(testkeys.APIFailure, *tk.Failure) + certPool = nil + } else if ok := certPool.AppendCertsFromPEM([]byte(tk.HTTPResponseBody)); !ok { + testkeys.CACertStatus = false + testkeys.APIStatus = "blocked" + testkeys.APIFailure = append(testkeys.APIFailure, "invalid_ca") + certPool = nil } } @@ -230,28 +287,42 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { CertPool: certPool, Method: "GET", FailOnHTTPError: true, + NoTLSVerify: !testkeys.CACertStatus, }}, {Target: eipServiceURL, Config: urlgetter.Config{ CertPool: certPool, Method: "GET", FailOnHTTPError: true, + NoTLSVerify: !testkeys.CACertStatus, }}, {Target: geoServiceURL, Config: urlgetter.Config{ CertPool: certPool, Method: "GET", FailOnHTTPError: true, + NoTLSVerify: !testkeys.CACertStatus, }}, } - for entry := range multi.CollectOverall(ctx, inputs, 1, 50, "riseupvpn", callbacks) { + + for entry := range multi.CollectOverall(ctx, inputs, 1, 20, "riseupvpn", callbacks) { testkeys.UpdateProviderAPITestKeys(entry) } + if testkeys.APIStatus == "blocked" { + for _, input := range inputs { + input.Config.Tunnel = "torsf" + } + for entry := range multi.CollectOverall(ctx, inputs, 1, 20, "riseupvpn", callbacks) { + testkeys.UpdateProviderAPITestKeys(entry) + } + } + // test gateways now testkeys.TransportStatus = map[string]string{} gateways := parseGateways(testkeys) openvpnEndpoints := generateMultiInputs(gateways, "openvpn") - obfs4Endpoints := generateMultiInputs(gateways, "obfs4") - overallCount := 1 + len(inputs) + len(openvpnEndpoints) + len(obfs4Endpoints) + //obfs4Endpoints := generateMultiInputs(gateways, "obfs4") + obfs4HandshakeTargets := generateObfs4HandshakeInputs(gateways) + overallCount := 1 + len(inputs) + len(openvpnEndpoints) + len(obfs4HandshakeTargets) // measure openvpn in parallel for entry := range multi.CollectOverall( @@ -259,17 +330,18 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { testkeys.AddGatewayConnectTestKeys(entry, "openvpn") } - // measure obfs4 in parallel - // TODO(bassosimone): when urlgetter is able to do obfs4 handshakes, here - // can possibly also test for the obfs4 handshake. - // See https://github.com/ooni/probe/issues/1463. - for entry := range multi.CollectOverall( - ctx, obfs4Endpoints, 1+len(inputs)+len(openvpnEndpoints), overallCount, "riseupvpn", callbacks) { - testkeys.AddGatewayConnectTestKeys(entry, "obfs4") + obfs4Measurer := measurex.NewMeasurerWithDefaultSettings() + obfs4Measurer.Begin = measurement.MeasurementStartTimeSaved + const timeout = 15 * time.Second + + for _, target := range obfs4HandshakeTargets { + meas, failure := obfs4Measurer.EasyOBFS4ConnectAndHandshake( + ctx, timeout, target.Address, sess.TempDir(), target.Params) + testkeys.AddGatewayObfs4HandshakeTestKeys(target.Address, *meas, failure) } // set transport status based on gateway test results - testkeys.updateTransportStatus(len(openvpnEndpoints), len(obfs4Endpoints)) + testkeys.updateTransportStatus(len(openvpnEndpoints), len(obfs4HandshakeTargets)) return nil } @@ -298,19 +370,142 @@ func generateMultiInputs(gateways []GatewayV3, transportType string) []urlgetter return gatewayInputs } +func generateObfs4HandshakeInputs(gateways []GatewayV3) []model.OOAPITorTarget { + var results []model.OOAPITorTarget = nil + for _, gateway := range gateways { + for _, transport := range gateway.Capabilities.Transport { + if transport.Type != "obfs4" { + continue + } + for _, port := range transport.Ports { + target := model.OOAPITorTarget{ + Address: fmt.Sprintf("%s:%s", gateway.IPAddress, port), + Params: map[string][]string{ + "cert": { + transport.Options.Cert, + }, + "iat-mode": {transport.Options.IatMode}, + }, + Protocol: "obfs4", + } + results = append(results, target) + } + } + } + return results +} + func parseGateways(testKeys *TestKeys) []GatewayV3 { + var eipService *EipService = nil + var geoService *GeoService = nil for _, requestEntry := range testKeys.Requests { if requestEntry.Request.URL == eipServiceURL && requestEntry.Failure == nil { - // TODO(bassosimone,cyberta): is it reasonable that we discard - // the error when the JSON we fetched cannot be parsed? - // See https://github.com/ooni/probe/issues/1432 - eipService, err := DecodeEIP3(requestEntry.Response.Body.Value) - if err == nil { - return eipService.Gateways + var err error = nil + eipService, err = DecodeEIP3(requestEntry.Response.Body.Value) + if err != nil { + testKeys.APIFailure = append(testKeys.APIFailure, "invalid_eipservice_response") + return nil + } + } else if requestEntry.Request.URL == geoServiceURL && requestEntry.Failure == nil { + var err error = nil + geoService, err = DecodeGeoService(requestEntry.Response.Body.Value) + if err != nil { + testKeys.APIFailure = append(testKeys.APIFailure, "invalid_geoservice_response") } } } - return nil + return filterGateways(eipService, geoService) +} + +// filterGateways selects a subset of available gateways supporting obfs4 +func filterGateways(eipService *EipService, geoService *GeoService) []GatewayV3 { + var result []GatewayV3 = nil + if eipService != nil { + locations := getLocationsUnderTest(eipService, geoService) + for _, gateway := range eipService.Gateways { + if !gateway.hasTransport("obfs4") || + !gateway.isLocationUnderTest(locations) || + geoService != nil && !geoService.isHealthyGateway(gateway) { + continue + } + result = append(result, gateway) + if len(result) == 3 { + return result + } + } + } + return result +} + +// getLocationsUnderTest parses all gateways supporting obfs4 and returns the two locations having most obfs4 bridges +func getLocationsUnderTest(eipService *EipService, geoService *GeoService) []string { + var result []string = nil + if eipService != nil { + locationMap := map[string]int{} + locations := []string{} + for _, gateway := range eipService.Gateways { + if !gateway.hasTransport("obfs4") { + continue + } + if _, ok := locationMap[gateway.Location]; !ok { + locations = append(locations, gateway.Location) + } + locationMap[gateway.Location] += 1 + } + + location1 := "" + location2 := "" + for _, location := range locations { + if locationMap[location] > locationMap[location1] { + location2 = location1 + location1 = location + } else if locationMap[location] > locationMap[location2] { + location2 = location + } + } + if location1 != "" { + result = append(result, location1) + } + if location2 != "" { + result = append(result, location2) + } + } + + return result +} + +func (gateway *GatewayV3) hasTransport(s string) bool { + for _, transport := range gateway.Capabilities.Transport { + if s == transport.Type { + return true + } + } + return false +} + +func (gateway *GatewayV3) isLocationUnderTest(locations []string) bool { + for _, location := range locations { + if location == gateway.Location { + return true + } + } + return false +} + +func (geoService *GeoService) isHealthyGateway(gateway GatewayV3) bool { + if geoService.SortedGateways == nil { + // Earlier versions of the geoservice don't include the sorted gateway list containing the load info, + // so we can't say anything about the load of a gateway in that case. + // We assume it's an healthy location. Riseup will switch to the updated API soon *fingers crossed* + return true + } + for _, gatewayLoad := range geoService.SortedGateways { + if gatewayLoad.Host == gateway.Host { + return !gatewayLoad.Overload + } + } + + return false } // DecodeEIP3 decodes eip-service.json version 3 @@ -323,6 +518,16 @@ func DecodeEIP3(body string) (*EipService, error) { return &eip, nil } +// DecodeGeoService decodes geoService json +func DecodeGeoService(body string) (*GeoService, error) { + var gs GeoService + err := json.Unmarshal([]byte(body), &gs) + if err != nil { + return nil, err + } + return &gs, nil +} + // NewExperimentMeasurer creates a new ExperimentMeasurer. func NewExperimentMeasurer(config Config) model.ExperimentMeasurer { return Measurer{Config: config} diff --git a/internal/experiment/riseupvpn/riseupvpn_test.go b/internal/experiment/riseupvpn/riseupvpn_test.go index 9af91c9699..d82b4d2eec 100644 --- a/internal/experiment/riseupvpn/riseupvpn_test.go +++ b/internal/experiment/riseupvpn/riseupvpn_test.go @@ -291,7 +291,7 @@ func TestUpdateWithMixedResults(t *testing.T) { if tk.APIStatus != "blocked" { t.Fatal("ApiStatus should be blocked") } - if *tk.APIFailure != netxlite.FailureEOFError { + if len(tk.APIFailure) > 0 && tk.APIFailure[0] != netxlite.FailureEOFError { t.Fatal("invalid ApiFailure") } if tk.FailingGateways != nil { @@ -344,11 +344,24 @@ func TestInvalidCaCert(t *testing.T) { if tk.APIStatus != "blocked" { t.Fatal("ApiStatus should be blocked") } - if tk.FailingGateways != nil { - t.Fatal("invalid FailingGateways") + + if tk.FailingGateways == nil || len(tk.FailingGateways) != 1 { + t.Fatal("invalid length of FailingGateways") } - if tk.TransportStatus != nil { - t.Fatal("invalid TransportStatus") + + gw := tk.FailingGateways[0] + if gw.IP != "234.345.234.345" { + t.Fatal("invalid failed gateway ip: " + fmt.Sprint(gw.IP)) + } + if gw.Port != 443 { + t.Fatal("invalid failed gateway port: " + fmt.Sprint(gw.Port)) + } + if gw.TransportType != "openvpn" { + t.Fatal("invalid failed transport type: " + fmt.Sprint(gw.TransportType)) + } + + if tk.TransportStatus == nil || tk.TransportStatus["openvpn"] != "ok" { + t.Fatal("invalid TransportStatus: " + fmt.Sprint(tk.TransportStatus)) } } @@ -371,17 +384,17 @@ func TestFailureCaCertFetch(t *testing.T) { t.Fatal("invalid ApiStatus") } - if tk.APIFailure != nil { - t.Fatal("ApiFailure should be null") + if tk.APIFailure == nil || len(tk.APIFailure) != 1 || tk.APIFailure[0] != io.EOF.Error() { + t.Fatal("ApiFailure should not be null" + fmt.Sprint(tk.APIFailure)) } - if len(tk.Requests) > 1 { - t.Fatal("Unexpected requests") + if len(tk.Requests) == 1 { + t.Fatal("Too less requests, expected to run all API requests") } if tk.FailingGateways != nil { t.Fatal("invalid FailingGateways") } - if tk.TransportStatus != nil { - t.Fatal("invalid TransportStatus") + if tk.TransportStatus == nil || tk.TransportStatus["openvpn"] != "ok" || tk.TransportStatus["obfs4"] != "ok" { + t.Fatal("invalid TransportStatus: " + fmt.Sprint(tk.TransportStatus)) } }