Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ const (
ClientTimeout = 10
ProxyTimeout = 10
readLimit = 100000 //Maximum number of bytes to be read from an HTTP request

NATUnknown = "unknown"
NATRestricted = "restricted"
NATUnrestricted = "unrestricted"
)

type BrokerContext struct {
snowflakes *SnowflakeHeap
// Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST.
snowflakes *SnowflakeHeap
restrictedSnowflakes *SnowflakeHeap
// Maps keeping track of snowflakeIDs required to match SDP answers from
// the second http POST. Restricted snowflakes can only be matched up with
// clients behind an unrestricted NAT.
idToSnowflake map[string]*Snowflake
// Synchronization for the snowflake map and heap
snowflakeLock sync.Mutex
Expand All @@ -47,6 +53,8 @@ type BrokerContext struct {
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
rSnowflakes := new(SnowflakeHeap)
heap.Init(rSnowflakes)
metrics, err := NewMetrics(metricsLogger)

if err != nil {
Expand All @@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
}

return &BrokerContext{
snowflakes: snowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: metrics,
snowflakes: snowflakes,
restrictedSnowflakes: rSnowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: metrics,
}
}

Expand Down Expand Up @@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type ProxyPoll struct {
id string
proxyType string
natType string
offerChannel chan []byte
}

// Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler.
func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte {
request := new(ProxyPoll)
request.id = id
request.proxyType = proxyType
request.natType = natType
request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer.
Expand All @@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
// client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls {
snowflake := ctx.AddSnowflake(request.id, request.proxyType)
snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType)
// Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) {
select {
Expand All @@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() {
ctx.snowflakeLock.Lock()
defer ctx.snowflakeLock.Unlock()
if snowflake.index != -1 {
heap.Remove(ctx.snowflakes, snowflake.index)
if request.natType == NATRestricted {
heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
} else {
heap.Remove(ctx.snowflakes, snowflake.index)
}
delete(ctx.idToSnowflake, snowflake.id)
close(request.offerChannel)
}
Expand All @@ -145,15 +160,19 @@ func (ctx *BrokerContext) Broker() {
// Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer.
func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake {
snowflake := new(Snowflake)
snowflake.id = id
snowflake.clients = 0
snowflake.proxyType = proxyType
snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte)
ctx.snowflakeLock.Lock()
heap.Push(ctx.snowflakes, snowflake)
if natType == NATRestricted {
heap.Push(ctx.restrictedSnowflakes, snowflake)
} else {
heap.Push(ctx.snowflakes, snowflake)
}
ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake
return snowflake
Expand All @@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}

sid, proxyType, err := messages.DecodePollRequest(body)
sid, proxyType, natType, err := messages.DecodePollRequest(body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
Expand All @@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}

// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(sid, proxyType)
offer := ctx.RequestOffer(sid, proxyType, natType)
var b []byte
if nil == offer {
ctx.metrics.lock.Lock()
Expand Down Expand Up @@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}

natType := r.Header.Get("X-NAT-TYPE")
if natType == "" {
natType = NATUnknown
}

// Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap
if natType == NATUnrestricted {
snowflakeHeap = ctx.restrictedSnowflakes
} else {
snowflakeHeap = ctx.snowflakes
}

// Immediately fail if there are no snowflakes available.
ctx.snowflakeLock.Lock()
numSnowflakes := ctx.snowflakes.Len()
numSnowflakes := snowflakeHeap.Len()
ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
ctx.metrics.lock.Lock()
Expand All @@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
ctx.snowflakeLock.Lock()
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer

Expand Down
14 changes: 7 additions & 7 deletions broker/snowflake-broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.idToSnowflake), ShouldEqual, 0)
ctx.AddSnowflake("foo", "")
ctx.AddSnowflake("foo", "", NATUnknown)
So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.idToSnowflake), ShouldEqual, 1)
})
Expand All @@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte)
go func() {
offer := ctx.RequestOffer("test", "")
offer := ctx.RequestOffer("test", "", NATUnknown)
done <- offer
}()
request := <-ctx.proxyPolls
Expand All @@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
Convey("with a proxy answer if available.", func() {
done := make(chan bool)
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "")
snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
done <- true
Expand All @@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
return
}
done := make(chan bool)
snowflake := ctx.AddSnowflake("fake", "")
snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
// Takes a few seconds here...
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
})

Convey("Responds to proxy answers...", func() {
s := ctx.AddSnowflake("test", "")
s := ctx.AddSnowflake("test", "", NATUnknown)
w := httptest.NewRecorder()
data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestBroker(t *testing.T) {
// Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
s := ctx.AddSnowflake(p.id, "")
s := ctx.AddSnowflake(p.id, "", NATUnknown)
go func() {
offer := <-s.offerChannel
p.offerChannel <- offer
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) {
So(err, ShouldBeNil)

// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "")
snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
done <- true
Expand Down
20 changes: 13 additions & 7 deletions client/lib/lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
. "github.com/smartystreets/goconvey/convey"
)

const (
NATUnknown = "unknown"
NATRestricted = "restricted"
NATUnrestricted = "unrestricted"
)

type MockTransport struct {
statusOverride int
body []byte
Expand Down Expand Up @@ -194,15 +200,15 @@ func TestSnowflakeClient(t *testing.T) {
}

Convey("Construct BrokerChannel with no front domain", func() {
b, err := NewBrokerChannel("test.broker", "", transport, false)
b, err := NewBrokerChannel("test.broker", "", transport, false, NATUnknown)
So(b.url, ShouldNotBeNil)
So(err, ShouldBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.transport, ShouldNotBeNil)
})

Convey("Construct BrokerChannel *with* front domain", func() {
b, err := NewBrokerChannel("test.broker", "front", transport, false)
b, err := NewBrokerChannel("test.broker", "front", transport, false, NATUnknown)
So(b.url, ShouldNotBeNil)
So(err, ShouldBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
Expand All @@ -211,7 +217,7 @@ func TestSnowflakeClient(t *testing.T) {
})

Convey("BrokerChannel.Negotiate responds with answer", func() {
b, err := NewBrokerChannel("test.broker", "", transport, false)
b, err := NewBrokerChannel("test.broker", "", transport, false, NATUnknown)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil)
Expand All @@ -222,7 +228,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("BrokerChannel.Negotiate fails with 503", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable, []byte("\n")},
false)
false, NATUnknown)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
Expand All @@ -233,7 +239,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("BrokerChannel.Negotiate fails with 400", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest, []byte("\n")},
false)
false, NATUnknown)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
Expand All @@ -244,7 +250,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("BrokerChannel.Negotiate fails with large read", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusOK, make([]byte, 100001, 100001)},
false)
false, NATUnknown)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
Expand All @@ -254,7 +260,7 @@ func TestSnowflakeClient(t *testing.T) {

Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{123, []byte("")}, false)
&MockTransport{123, []byte("")}, false, NATUnknown)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
Expand Down
11 changes: 10 additions & 1 deletion client/lib/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BrokerChannel struct {
url *url.URL
transport http.RoundTripper // Used to make all requests.
keepLocalAddresses bool
NATType string
}

// We make a copy of DefaultTransport because we want the default Dial
Expand All @@ -50,7 +51,7 @@ func CreateBrokerTransport() http.RoundTripper {
// Construct a new BrokerChannel, where:
// |broker| is the full URL of the facilitating program which assigns proxies
// to clients, and |front| is the option fronting domain.
func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) {
func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool, NATType string) (*BrokerChannel, error) {
targetURL, err := url.Parse(broker)
if err != nil {
return nil, err
Expand All @@ -66,6 +67,7 @@ func NewBrokerChannel(broker string, front string, transport http.RoundTripper,

bc.transport = transport
bc.keepLocalAddresses = keepLocalAddresses
bc.NATType = NATType
return bc, nil
}

Expand Down Expand Up @@ -110,6 +112,8 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
if "" != bc.Host { // Set true host if necessary.
request.Host = bc.Host
}
// include NAT-TYPE
request.Header.Set("X-NAT-TYPE", bc.NATType)
resp, err := bc.transport.RoundTrip(request)
if nil != err {
return nil, err
Expand All @@ -133,6 +137,11 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
}
}

func (bc *BrokerChannel) SetNATType(NATType string) {
bc.NATType = NATType
log.Printf("NAT Type: %s", NATType)
}

// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
type WebRTCDialer struct {
*BrokerChannel
Expand Down
Loading