Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions proxy-go/proxy-go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestBrokerInteractions(t *testing.T) {
const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`

Convey("Proxy connections to broker", t, func() {
broker := new(Broker)
broker := new(Remote)
broker.url, _ = url.Parse("localhost")

//Mock peerConnection
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestBrokerInteractions(t *testing.T) {
}
err = broker.sendAnswer("test", pc)
So(err, ShouldNotEqual, nil)
So(err.Error(), ShouldResemble, "broker returned 410")
So(err.Error(), ShouldResemble, "error sending answer to broker: remote returned status code 410")

//Error if we can't parse broker message
broker.transport = &MockTransport{
Expand Down
191 changes: 139 additions & 52 deletions proxy-go/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

const defaultBrokerURL = "https://snowflake-broker.bamsoftware.com/"
const defaultRelayURL = "wss://snowflake.bamsoftware.com/"
const defaultProbeURL = "http://159.203.63.110:8080"
const defaultSTUNURL = "stun:stun.l.google.com:19302"
const pollInterval = 5 * time.Second

Expand All @@ -37,7 +38,7 @@ const dataChannelTimeout = 20 * time.Second

const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request

var broker *Broker
var broker *Remote
var relayURL string

const (
Expand Down Expand Up @@ -69,11 +70,6 @@ func remoteIPFromSDP(sdp string) net.IP {
return nil
}

type Broker struct {
url *url.URL
transport http.RoundTripper
}

type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
Expand Down Expand Up @@ -158,8 +154,32 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) {
return p, err
}

func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
type Remote struct {
url *url.URL
transport http.RoundTripper
}

func (r *Remote) MakePost(path string, payload io.Reader) ([]byte, error) {

req, err := http.NewRequest("POST", path, payload)
if err != nil {
return nil, err
}
resp, err := r.transport.RoundTrip(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode)
}

defer resp.Body.Close()
return limitedRead(resp.Body, readLimit)
}

func (r *Remote) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := r.url.ResolveReference(&url.URL{Path: "proxy"})
timeOfNextPoll := time.Now()
for {
// Sleep until we're scheduled to poll again.
Expand All @@ -178,56 +198,36 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
log.Printf("Error encoding poll message: %s", err.Error())
return nil
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
log.Printf("error polling broker: %s", err)
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("broker returns: %d", resp.StatusCode)
} else {
body, err := limitedRead(resp.Body, readLimit)
if err != nil {
log.Printf("error reading broker response: %s", err)
} else {

offer, err := messages.DecodePollResponse(body)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
return nil
}
if offer != "" {
return deserializeSessionDescription(offer)
}
}
}
log.Printf("error polling broker: %s", err.Error())
return nil
}
offer, err := messages.DecodePollResponse(response)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
return nil
}
if offer != "" {
return deserializeSessionDescription(offer)
}
}
}

func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
func (r *Remote) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := r.url.ResolveReference(&url.URL{Path: "answer"})
answer := string([]byte(serializeSessionDescription(pc.LocalDescription())))
body, err := messages.EncodeAnswerRequest(answer, sid)
if err != nil {
return err
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("broker returned %d", resp.StatusCode)
return fmt.Errorf("error sending answer to broker: %s", err.Error())
}

body, err = limitedRead(resp.Body, readLimit)
if err != nil {
return fmt.Errorf("error reading broker response: %s", err)
}
success, err := messages.DecodeAnswerResponse(body)
success, err := messages.DecodeAnswerResponse(response)
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) {
// conn.RemoteAddr() inside this function, as a workaround for a hang that
// otherwise occurs inside of conn.pc.RemoteDescription() (called by
// RemoteAddr). https://bugs.torproject.org/18628#comment:8
func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
func datachannelHandler(conn *webRTCConn) {
defer conn.Close()
defer retToken()

Expand All @@ -268,10 +268,10 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
}

// Retrieve client IP address
if remoteAddr != nil {
if conn.RemoteAddr() != nil {
// Encode client IP address in relay URL
q := u.Query()
clientIP := remoteAddr.String()
clientIP := conn.RemoteAddr().String()
q.Set("client_ip", clientIP)
u.RawQuery = q.Encode()
} else {
Expand All @@ -290,11 +290,24 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
log.Printf("datachannelHandler ends")
}

// Handler for the throughput test
func throughputHandler(conn *webRTCConn) {
defer conn.Close()

if _, err := io.Copy(conn, conn); err != nil {
log.Printf("io.Copy inside CopyLoop generated an error: %v", err)
}

}

// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
config webrtc.Configuration, dataChan chan struct{},
handler func(conn *webRTCConn)) (*webrtc.PeerConnection, error) {

pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
Expand Down Expand Up @@ -330,7 +343,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C
}
})

go datachannelHandler(conn, conn.RemoteAddr())
go handler(conn)
})

err = pc.SetRemoteDescription(*sdp)
Expand Down Expand Up @@ -373,7 +386,7 @@ func runSession(sid string) {
return
}
dataChan := make(chan struct{})
pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
Expand Down Expand Up @@ -403,15 +416,76 @@ func runSession(sid string) {
}
}

func testThroughput(config webrtc.Configuration, probeURL string) {
var err error
var offer SnowflakeOffer
var result SnowflakeResult

sessionID := genSessionID()

probe := new(Remote)
probe.transport = http.DefaultTransport.(*http.Transport)
probe.url, err = url.Parse(probeURL)
if err != nil {
log.Printf("Error parsing url: %s", err.Error())
}
probePath := probe.url.ResolveReference(&url.URL{Path: "api/snowflake-poll"})

response, err := probe.MakePost(probePath.String(), bytes.NewBuffer(CreateSnowflakeRequest(sessionID)))
if err != nil {
log.Printf("Error connecting to probe point: %s", err.Error())
return
}
if err = json.Unmarshal(response, &offer); err != nil {
log.Printf("error reading bridgestrap response: %s", err.Error())
log.Printf("body: %s", response)
return
}

sdp := deserializeSessionDescription(offer.Offer)

// create answer
dataChan := make(chan struct{})
pc, err := makePeerConnectionFromOffer(sdp, config, dataChan, throughputHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
return
}

answer := pc.LocalDescription()

// send answer
testReq := CreateSnowflakeAnswer(sessionID, serializeSessionDescription(answer))
probePath = probe.url.ResolveReference(&url.URL{Path: "api/snowflake-test"})

response, err = probe.MakePost(probePath.String(), bytes.NewBuffer(testReq))
if err != nil {
log.Printf("Error connecting to probe point: %s", err.Error())
return
}
if err = json.Unmarshal(response, &result); err != nil {
log.Printf("error reading bridgestrap response: %s", err.Error())
log.Printf("body: %s", response)
return
}

log.Printf("Throughput: %f Kbps", result.Throughput)
log.Printf("Latency: %f s", result.Latency)

}

func main() {
var capacity uint
var stunURL string
var logFilename string
var rawBrokerURL string
var probeURL string

flag.UintVar(&capacity, "capacity", 10, "maximum concurrent clients")
flag.StringVar(&rawBrokerURL, "broker", defaultBrokerURL, "broker URL")
flag.StringVar(&relayURL, "relay", defaultRelayURL, "websocket relay URL")
flag.StringVar(&probeURL, "probe", defaultProbeURL, "URL for throughput testing probe")
flag.StringVar(&stunURL, "stun", defaultSTUNURL, "stun URL")
flag.StringVar(&logFilename, "log", "", "log filename")
flag.Parse()
Expand All @@ -432,7 +506,7 @@ func main() {
log.Println("starting")

var err error
broker = new(Broker)
broker = new(Remote)
broker.url, err = url.Parse(rawBrokerURL)
if err != nil {
log.Fatalf("invalid broker url: %s", err)
Expand All @@ -445,6 +519,10 @@ func main() {
if err != nil {
log.Fatalf("invalid relay url: %s", err)
}
_, err = url.Parse(probeURL)
if err != nil {
log.Fatalf("invalid probe url: %s", err)
}

broker.transport = http.DefaultTransport.(*http.Transport)
config = webrtc.Configuration{
Expand All @@ -459,6 +537,15 @@ func main() {
tokens <- true
}

//Perform a throughput test
testThroughput(config, probeURL)
go func() {
heartbeat := time.Tick(24 * time.Hour)
for range heartbeat {
testThroughput(config, probeURL)
}
}()

for {
getToken()
sessionID := genSessionID()
Expand Down
43 changes: 43 additions & 0 deletions proxy-go/throughput.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"encoding/json"
)

type SnowflakeRequest struct {
SnowflakeID string `json:"snowflake_id"`
}

type SnowflakeOffer struct {
Offer string `json:"offer"`
}

type SnowflakeAnswer struct {
SnowflakeID string `json:"snowflake_id"`
Answer string `json:"answer"`
}

type SnowflakeResult struct {
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
Error string `json:"error"`
}

func CreateSnowflakeRequest(id string) []byte {
request := &SnowflakeRequest{
SnowflakeID: id,
}
jsonRequest, _ := json.Marshal(request)

return jsonRequest
}

func CreateSnowflakeAnswer(id string, answer string) []byte {
request := &SnowflakeAnswer{
SnowflakeID: id,
Answer: answer,
}
jsonRequest, _ := json.Marshal(request)

return jsonRequest
}