diff --git a/proxy-go/proxy-go_test.go b/proxy-go/proxy-go_test.go index ebe4381..3b00078 100644 --- a/proxy-go/proxy-go_test.go +++ b/proxy-go/proxy-go_test.go @@ -227,7 +227,7 @@ func TestBrokerInteractions(t *testing.T) { const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}` Convey("Proxy connections to broker", t, func() { - broker := new(Broker) + broker := new(Remote) broker.url, _ = url.Parse("localhost") //Mock peerConnection @@ -307,7 +307,7 @@ func TestBrokerInteractions(t *testing.T) { } err = broker.sendAnswer("test", pc) So(err, ShouldNotEqual, nil) - So(err.Error(), ShouldResemble, "broker returned 410") + So(err.Error(), ShouldResemble, "error sending answer to broker: remote returned status code 410") //Error if we can't parse broker message broker.transport = &MockTransport{ diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index 5e56842..9e27250 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -28,6 +28,7 @@ import ( const defaultBrokerURL = "https://snowflake-broker.bamsoftware.com/" const defaultRelayURL = "wss://snowflake.bamsoftware.com/" +const defaultProbeURL = "http://159.203.63.110:8080" const defaultSTUNURL = "stun:stun.l.google.com:19302" const pollInterval = 5 * time.Second @@ -37,7 +38,7 @@ const dataChannelTimeout = 20 * time.Second const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request -var broker *Broker +var broker *Remote var relayURL string const ( @@ -69,11 +70,6 @@ func remoteIPFromSDP(sdp string) net.IP { return nil } -type Broker struct { - url *url.URL - transport http.RoundTripper -} - type webRTCConn struct { dc *webrtc.DataChannel pc *webrtc.PeerConnection @@ -158,8 +154,32 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) { return p, err } -func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { - brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"}) +type Remote struct { + url *url.URL + transport http.RoundTripper +} + +func (r *Remote) MakePost(path string, payload io.Reader) ([]byte, error) { + + req, err := http.NewRequest("POST", path, payload) + if err != nil { + return nil, err + } + resp, err := r.transport.RoundTrip(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode) + } + + defer resp.Body.Close() + return limitedRead(resp.Body, readLimit) +} + +func (r *Remote) pollOffer(sid string) *webrtc.SessionDescription { + brokerPath := r.url.ResolveReference(&url.URL{Path: "proxy"}) timeOfNextPoll := time.Now() for { // Sleep until we're scheduled to poll again. @@ -178,56 +198,36 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { log.Printf("Error encoding poll message: %s", err.Error()) return nil } - req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body)) - resp, err := b.transport.RoundTrip(req) + response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { - log.Printf("error polling broker: %s", err) - } else { - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - log.Printf("broker returns: %d", resp.StatusCode) - } else { - body, err := limitedRead(resp.Body, readLimit) - if err != nil { - log.Printf("error reading broker response: %s", err) - } else { - - offer, err := messages.DecodePollResponse(body) - if err != nil { - log.Printf("error reading broker response: %s", err.Error()) - log.Printf("body: %s", body) - return nil - } - if offer != "" { - return deserializeSessionDescription(offer) - } - } - } + log.Printf("error polling broker: %s", err.Error()) + return nil + } + offer, err := messages.DecodePollResponse(response) + if err != nil { + log.Printf("error reading broker response: %s", err.Error()) + log.Printf("body: %s", body) + return nil + } + if offer != "" { + return deserializeSessionDescription(offer) } } } -func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error { - brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"}) +func (r *Remote) sendAnswer(sid string, pc *webrtc.PeerConnection) error { + brokerPath := r.url.ResolveReference(&url.URL{Path: "answer"}) answer := string([]byte(serializeSessionDescription(pc.LocalDescription()))) body, err := messages.EncodeAnswerRequest(answer, sid) if err != nil { return err } - req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body)) - resp, err := b.transport.RoundTrip(req) + response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { - return err - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("broker returned %d", resp.StatusCode) + return fmt.Errorf("error sending answer to broker: %s", err.Error()) } - body, err = limitedRead(resp.Body, readLimit) - if err != nil { - return fmt.Errorf("error reading broker response: %s", err) - } - success, err := messages.DecodeAnswerResponse(body) + success, err := messages.DecodeAnswerResponse(response) if err != nil { return err } @@ -258,7 +258,7 @@ func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) { // conn.RemoteAddr() inside this function, as a workaround for a hang that // otherwise occurs inside of conn.pc.RemoteDescription() (called by // RemoteAddr). https://bugs.torproject.org/18628#comment:8 -func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { +func datachannelHandler(conn *webRTCConn) { defer conn.Close() defer retToken() @@ -268,10 +268,10 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { } // Retrieve client IP address - if remoteAddr != nil { + if conn.RemoteAddr() != nil { // Encode client IP address in relay URL q := u.Query() - clientIP := remoteAddr.String() + clientIP := conn.RemoteAddr().String() q.Set("client_ip", clientIP) u.RawQuery = q.Encode() } else { @@ -290,11 +290,24 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { log.Printf("datachannelHandler ends") } +// Handler for the throughput test +func throughputHandler(conn *webRTCConn) { + defer conn.Close() + + if _, err := io.Copy(conn, conn); err != nil { + log.Printf("io.Copy inside CopyLoop generated an error: %v", err) + } + +} + // Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE // candidates is complete and the answer is available in LocalDescription. // Installs an OnDataChannel callback that creates a webRTCConn and passes it to // datachannelHandler. -func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) { +func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, + config webrtc.Configuration, dataChan chan struct{}, + handler func(conn *webRTCConn)) (*webrtc.PeerConnection, error) { + pc, err := webrtc.NewPeerConnection(config) if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) @@ -330,7 +343,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C } }) - go datachannelHandler(conn, conn.RemoteAddr()) + go handler(conn) }) err = pc.SetRemoteDescription(*sdp) @@ -373,7 +386,7 @@ func runSession(sid string) { return } dataChan := make(chan struct{}) - pc, err := makePeerConnectionFromOffer(offer, config, dataChan) + pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler) if err != nil { log.Printf("error making WebRTC connection: %s", err) retToken() @@ -403,15 +416,76 @@ func runSession(sid string) { } } +func testThroughput(config webrtc.Configuration, probeURL string) { + var err error + var offer SnowflakeOffer + var result SnowflakeResult + + sessionID := genSessionID() + + probe := new(Remote) + probe.transport = http.DefaultTransport.(*http.Transport) + probe.url, err = url.Parse(probeURL) + if err != nil { + log.Printf("Error parsing url: %s", err.Error()) + } + probePath := probe.url.ResolveReference(&url.URL{Path: "api/snowflake-poll"}) + + response, err := probe.MakePost(probePath.String(), bytes.NewBuffer(CreateSnowflakeRequest(sessionID))) + if err != nil { + log.Printf("Error connecting to probe point: %s", err.Error()) + return + } + if err = json.Unmarshal(response, &offer); err != nil { + log.Printf("error reading bridgestrap response: %s", err.Error()) + log.Printf("body: %s", response) + return + } + + sdp := deserializeSessionDescription(offer.Offer) + + // create answer + dataChan := make(chan struct{}) + pc, err := makePeerConnectionFromOffer(sdp, config, dataChan, throughputHandler) + if err != nil { + log.Printf("error making WebRTC connection: %s", err) + retToken() + return + } + + answer := pc.LocalDescription() + + // send answer + testReq := CreateSnowflakeAnswer(sessionID, serializeSessionDescription(answer)) + probePath = probe.url.ResolveReference(&url.URL{Path: "api/snowflake-test"}) + + response, err = probe.MakePost(probePath.String(), bytes.NewBuffer(testReq)) + if err != nil { + log.Printf("Error connecting to probe point: %s", err.Error()) + return + } + if err = json.Unmarshal(response, &result); err != nil { + log.Printf("error reading bridgestrap response: %s", err.Error()) + log.Printf("body: %s", response) + return + } + + log.Printf("Throughput: %f Kbps", result.Throughput) + log.Printf("Latency: %f s", result.Latency) + +} + func main() { var capacity uint var stunURL string var logFilename string var rawBrokerURL string + var probeURL string flag.UintVar(&capacity, "capacity", 10, "maximum concurrent clients") flag.StringVar(&rawBrokerURL, "broker", defaultBrokerURL, "broker URL") flag.StringVar(&relayURL, "relay", defaultRelayURL, "websocket relay URL") + flag.StringVar(&probeURL, "probe", defaultProbeURL, "URL for throughput testing probe") flag.StringVar(&stunURL, "stun", defaultSTUNURL, "stun URL") flag.StringVar(&logFilename, "log", "", "log filename") flag.Parse() @@ -432,7 +506,7 @@ func main() { log.Println("starting") var err error - broker = new(Broker) + broker = new(Remote) broker.url, err = url.Parse(rawBrokerURL) if err != nil { log.Fatalf("invalid broker url: %s", err) @@ -445,6 +519,10 @@ func main() { if err != nil { log.Fatalf("invalid relay url: %s", err) } + _, err = url.Parse(probeURL) + if err != nil { + log.Fatalf("invalid probe url: %s", err) + } broker.transport = http.DefaultTransport.(*http.Transport) config = webrtc.Configuration{ @@ -459,6 +537,15 @@ func main() { tokens <- true } + //Perform a throughput test + testThroughput(config, probeURL) + go func() { + heartbeat := time.Tick(24 * time.Hour) + for range heartbeat { + testThroughput(config, probeURL) + } + }() + for { getToken() sessionID := genSessionID() diff --git a/proxy-go/throughput.go b/proxy-go/throughput.go new file mode 100644 index 0000000..97dd5c3 --- /dev/null +++ b/proxy-go/throughput.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" +) + +type SnowflakeRequest struct { + SnowflakeID string `json:"snowflake_id"` +} + +type SnowflakeOffer struct { + Offer string `json:"offer"` +} + +type SnowflakeAnswer struct { + SnowflakeID string `json:"snowflake_id"` + Answer string `json:"answer"` +} + +type SnowflakeResult struct { + Throughput float64 `json:"throughput"` + Latency float64 `json:"latency"` + Error string `json:"error"` +} + +func CreateSnowflakeRequest(id string) []byte { + request := &SnowflakeRequest{ + SnowflakeID: id, + } + jsonRequest, _ := json.Marshal(request) + + return jsonRequest +} + +func CreateSnowflakeAnswer(id string, answer string) []byte { + request := &SnowflakeAnswer{ + SnowflakeID: id, + Answer: answer, + } + jsonRequest, _ := json.Marshal(request) + + return jsonRequest +}