From 2d1788dfe1dc3470870f210463d8ccbe5ac82a9d Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 10 Jun 2020 17:16:12 -0400 Subject: [PATCH 1/5] Implement NAT discovery (RFC 5780) at the client Snowflake clients will now attempt NAT discovery using the provided STUN servers and report their NAT type to the Snowflake broker for matching. The three possibilities for NAT types are: - unknown (the client was unable to determine their NAT type), - restricted (the client has a restrictive NAT and can only be paired with unrestricted NATs) - unrestricted (the client can be paired with any other NAT). --- client/lib/rendezvous.go | 11 +- client/snowflake.go | 32 ++++- common/nat/nat.go | 245 +++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 5 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 common/nat/nat.go diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index ca15d35..04dbdde 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -36,6 +36,7 @@ type BrokerChannel struct { url *url.URL transport http.RoundTripper // Used to make all requests. keepLocalAddresses bool + NATType string } // We make a copy of DefaultTransport because we want the default Dial @@ -50,7 +51,7 @@ func CreateBrokerTransport() http.RoundTripper { // Construct a new BrokerChannel, where: // |broker| is the full URL of the facilitating program which assigns proxies // to clients, and |front| is the option fronting domain. -func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) { +func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool, NATType string) (*BrokerChannel, error) { targetURL, err := url.Parse(broker) if err != nil { return nil, err @@ -66,6 +67,7 @@ func NewBrokerChannel(broker string, front string, transport http.RoundTripper, bc.transport = transport bc.keepLocalAddresses = keepLocalAddresses + bc.NATType = NATType return bc, nil } @@ -110,6 +112,8 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( if "" != bc.Host { // Set true host if necessary. request.Host = bc.Host } + // include NAT-TYPE + request.Header.Set("X-NAT-TYPE", bc.NATType) resp, err := bc.transport.RoundTrip(request) if nil != err { return nil, err @@ -133,6 +137,11 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( } } +func (bc *BrokerChannel) SetNATType(NATType string) { + bc.NATType = NATType + log.Printf("NAT Type: %s", NATType) +} + // Implements the |Tongue| interface to catch snowflakes, using BrokerChannel. type WebRTCDialer struct { *BrokerChannel diff --git a/client/snowflake.go b/client/snowflake.go index d66225d..f7da590 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -16,12 +16,16 @@ import ( pt "git.torproject.org/pluggable-transports/goptlib.git" sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib" + "git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "github.com/pion/webrtc/v2" ) const ( DefaultSnowflakeCapacity = 1 + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" ) // Maintain |SnowflakeCapacity| number of available WebRTC connections, to @@ -154,10 +158,13 @@ func main() { // Use potentially domain-fronting broker to rendezvous. broker, err := sf.NewBrokerChannel( *brokerURL, *frontDomain, sf.CreateBrokerTransport(), - *keepLocalAddresses || *oldKeepLocalAddresses) + *keepLocalAddresses || *oldKeepLocalAddresses, + NATUnknown) if err != nil { log.Fatalf("parsing broker URL: %v", err) } + go updateNATType(iceServers, broker) + snowflakes.Tongue = sf.NewWebRTCDialer(broker, iceServers) // Use a real logger to periodically output how much traffic is happening. @@ -219,3 +226,26 @@ func main() { snowflakes.End() log.Println("snowflake is done.") } + +// loop through all provided STUN servers until we exhaust the list or find +// one that is compatable with RFC 5780 +func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) { + + var restrictedNAT bool + var err error + for _, server := range servers { + addr := strings.TrimPrefix(server.URLs[0], "stun:") + restrictedNAT, err = nat.CheckIfRestrictedNAT(addr) + if err == nil { + if restrictedNAT { + broker.SetNATType(NATRestricted) + } else { + broker.SetNATType(NATUnrestricted) + } + break + } + } + if err != nil { + broker.SetNATType(NATUnknown) + } +} diff --git a/common/nat/nat.go b/common/nat/nat.go new file mode 100644 index 0000000..75b99f5 --- /dev/null +++ b/common/nat/nat.go @@ -0,0 +1,245 @@ +/* +The majority of this code is taken from a utility I wrote for pion/stun +https://github.com/pion/stun/blob/master/cmd/stun-nat-behaviour/main.go + +Copyright 2018 Pion LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package nat + +import ( + "errors" + "fmt" + "github.com/pion/stun" + "log" + "net" + "time" +) + +var ErrTimedOut = errors.New("timed out waiting for response") + +// This function checks the NAT mapping and filtering +// behaviour and returns true if the NAT is restrictive +// (address-dependent mapping and/or port-dependent filtering) +// and false if the NAT is unrestrictive (meaning it +// will work with most other NATs), +func CheckIfRestrictedNAT(server string) (bool, error) { + result, err := isRestrictedMapping(server) + if err != nil { + return false, err + } + if !result { + // if the mapping is unrestrictive, we still need to check whether + // the filtering is restrictive + result, err = isRestrictedFiltering(server) + } + return result, err +} + +// Performs two tests from RFC 5780 to determine whether the mapping type +// of the client's NAT is address-independent or address-dependent +// Returns true if the mapping is address-dependent and false otherwise +func isRestrictedMapping(addrStr string) (bool, error) { + var xorAddr1 stun.XORMappedAddress + var xorAddr2 stun.XORMappedAddress + + mapTestConn, err := connect(addrStr) + if err != nil { + log.Printf("Error creating STUN connection: %s", err.Error()) + return false, err + } + + defer mapTestConn.Close() + + // Test I: Regular binding request + message := stun.MustBuild(stun.TransactionID, stun.BindingRequest) + + resp, err := mapTestConn.RoundTrip(message, mapTestConn.PrimaryAddr) + if err == ErrTimedOut { + log.Printf("Error: no response from server") + return false, err + } + if err != nil { + log.Printf("Error receiving response from server: %s", err.Error()) + return false, err + } + + // Decoding XOR-MAPPED-ADDRESS attribute from message. + if err = xorAddr1.GetFrom(resp); err != nil { + log.Printf("Error retrieving XOR-MAPPED-ADDRESS resonse: %s", err.Error()) + return false, err + } + + // Decoding OTHER-ADDRESS attribute from message. + var otherAddr stun.OtherAddress + if err = otherAddr.GetFrom(resp); err != nil { + log.Println("NAT discovery feature not supported by this server") + return false, err + } + + if err = mapTestConn.AddOtherAddr(otherAddr.String()); err != nil { + log.Printf("Failed to resolve address %s\t", otherAddr.String()) + return false, err + } + + // Test II: Send binding request to other address + resp, err = mapTestConn.RoundTrip(message, mapTestConn.OtherAddr) + if err == ErrTimedOut { + log.Printf("Error: no response from server") + return false, err + } + if err != nil { + log.Printf("Error retrieving server response: %s", err.Error()) + return false, err + } + + // Decoding XOR-MAPPED-ADDRESS attribute from message. + if err = xorAddr2.GetFrom(resp); err != nil { + log.Printf("Error retrieving XOR-MAPPED-ADDRESS resonse: %s", err.Error()) + return false, err + } + + return xorAddr1.String() != xorAddr2.String(), nil + +} + +// Performs two tests from RFC 5780 to determine whether the filtering type +// of the client's NAT is port-dependent. +// Returns true if the filtering is port-dependent and false otherwise +func isRestrictedFiltering(addrStr string) (bool, error) { + var xorAddr stun.XORMappedAddress + + mapTestConn, err := connect(addrStr) + if err != nil { + log.Printf("Error creating STUN connection: %s", err.Error()) + return false, err + } + + defer mapTestConn.Close() + + // Test I: Regular binding request + message := stun.MustBuild(stun.TransactionID, stun.BindingRequest) + + resp, err := mapTestConn.RoundTrip(message, mapTestConn.PrimaryAddr) + if err == ErrTimedOut { + log.Printf("Error: no response from server") + return false, err + } + if err != nil { + log.Printf("Error: %s", err.Error()) + return false, err + } + + // Decoding XOR-MAPPED-ADDRESS attribute from message. + if err = xorAddr.GetFrom(resp); err != nil { + log.Printf("Error retrieving XOR-MAPPED-ADDRESS from resonse: %s", err.Error()) + return false, err + } + + // Test III: Request port change + message.Add(stun.AttrChangeRequest, []byte{0x00, 0x00, 0x00, 0x02}) + + _, err = mapTestConn.RoundTrip(message, mapTestConn.PrimaryAddr) + if err != ErrTimedOut && err != nil { + // something else went wrong + log.Printf("Error reading response from server: %s", err.Error()) + return false, err + } + + return err == ErrTimedOut, nil +} + +// Given an address string, returns a StunServerConn +func connect(addrStr string) (*StunServerConn, error) { + // Creating a "connection" to STUN server. + addr, err := net.ResolveUDPAddr("udp4", addrStr) + if err != nil { + log.Printf("Error resolving address: %s\n", err.Error()) + return nil, err + } + + c, err := net.ListenUDP("udp4", nil) + if err != nil { + return nil, err + } + + mChan := listen(c) + + return &StunServerConn{ + conn: c, + PrimaryAddr: addr, + messageChan: mChan, + }, nil +} + +type StunServerConn struct { + conn net.PacketConn + PrimaryAddr *net.UDPAddr + OtherAddr *net.UDPAddr + messageChan chan *stun.Message +} + +func (c *StunServerConn) Close() { + c.conn.Close() +} + +func (c *StunServerConn) RoundTrip(msg *stun.Message, addr net.Addr) (*stun.Message, error) { + _, err := c.conn.WriteTo(msg.Raw, addr) + if err != nil { + return nil, err + } + + // Wait for response or timeout + select { + case m, ok := <-c.messageChan: + if !ok { + return nil, fmt.Errorf("error reading from messageChan") + } + return m, nil + case <-time.After(10 * time.Second): + return nil, ErrTimedOut + } +} + +func (c *StunServerConn) AddOtherAddr(addrStr string) error { + addr2, err := net.ResolveUDPAddr("udp4", addrStr) + if err != nil { + return err + } + c.OtherAddr = addr2 + return nil +} + +// taken from https://github.com/pion/stun/blob/master/cmd/stun-traversal/main.go +func listen(conn *net.UDPConn) chan *stun.Message { + messages := make(chan *stun.Message) + go func() { + for { + buf := make([]byte, 1024) + + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + close(messages) + return + } + buf = buf[:n] + + m := new(stun.Message) + m.Raw = buf + err = m.Decode() + if err != nil { + close(messages) + return + } + + messages <- m + } + }() + return messages +} diff --git a/go.mod b/go.mod index 07c49a2..2ba1b2d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/protobuf v1.3.1 // indirect github.com/gorilla/websocket v1.4.1 github.com/pion/sdp/v2 v2.3.4 + github.com/pion/stun v0.3.5 github.com/pion/webrtc/v2 v2.2.2 github.com/smartystreets/goconvey v1.6.4 github.com/xtaci/kcp-go/v5 v5.5.12 diff --git a/go.sum b/go.sum index 6768e02..9ccfb30 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/pion/srtp v1.2.7 h1:UYyLs5MXwbFtXWduBA5+RUWhaEBX7GmetXDZSKP+uPM= github.com/pion/srtp v1.2.7/go.mod h1:KIgLSadhg/ioogO/LqIkRjZrwuJo0c9RvKIaGQj4Yew= github.com/pion/stun v0.3.3 h1:brYuPl9bN9w/VM7OdNzRSLoqsnwlyNvD9MVeJrHjDQw= github.com/pion/stun v0.3.3/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M= +github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= +github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= github.com/pion/transport v0.8.10 h1:lTiobMEw2PG6BH/mgIVqTV2mBp/mPT+IJLaN8ZxgdHk= github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8= From 3adad4f02f875a9b9cb7387a97928e741f89521e Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Tue, 16 Jun 2020 17:15:17 -0400 Subject: [PATCH 2/5] Fixed client lib calls to NewBrokerChannel --- client/lib/lib_test.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index ebcf284..34419a8 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -12,6 +12,12 @@ import ( . "github.com/smartystreets/goconvey/convey" ) +const ( + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" +) + type MockTransport struct { statusOverride int body []byte @@ -194,7 +200,7 @@ func TestSnowflakeClient(t *testing.T) { } Convey("Construct BrokerChannel with no front domain", func() { - b, err := NewBrokerChannel("test.broker", "", transport, false) + b, err := NewBrokerChannel("test.broker", "", transport, false, NATUnknown) So(b.url, ShouldNotBeNil) So(err, ShouldBeNil) So(b.url.Path, ShouldResemble, "test.broker") @@ -202,7 +208,7 @@ func TestSnowflakeClient(t *testing.T) { }) Convey("Construct BrokerChannel *with* front domain", func() { - b, err := NewBrokerChannel("test.broker", "front", transport, false) + b, err := NewBrokerChannel("test.broker", "front", transport, false, NATUnknown) So(b.url, ShouldNotBeNil) So(err, ShouldBeNil) So(b.url.Path, ShouldResemble, "test.broker") @@ -211,7 +217,7 @@ func TestSnowflakeClient(t *testing.T) { }) Convey("BrokerChannel.Negotiate responds with answer", func() { - b, err := NewBrokerChannel("test.broker", "", transport, false) + b, err := NewBrokerChannel("test.broker", "", transport, false, NATUnknown) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldBeNil) @@ -222,7 +228,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("BrokerChannel.Negotiate fails with 503", func() { b, err := NewBrokerChannel("test.broker", "", &MockTransport{http.StatusServiceUnavailable, []byte("\n")}, - false) + false, NATUnknown) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) @@ -233,7 +239,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("BrokerChannel.Negotiate fails with 400", func() { b, err := NewBrokerChannel("test.broker", "", &MockTransport{http.StatusBadRequest, []byte("\n")}, - false) + false, NATUnknown) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) @@ -244,7 +250,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("BrokerChannel.Negotiate fails with large read", func() { b, err := NewBrokerChannel("test.broker", "", &MockTransport{http.StatusOK, make([]byte, 100001, 100001)}, - false) + false, NATUnknown) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) @@ -254,7 +260,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("BrokerChannel.Negotiate fails with unexpected error", func() { b, err := NewBrokerChannel("test.broker", "", - &MockTransport{123, []byte("")}, false) + &MockTransport{123, []byte("")}, false, NATUnknown) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) From 8b52e58c3b9180f48ea0ec487b14f39ba6e7e171 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Tue, 16 Jun 2020 17:10:56 -0400 Subject: [PATCH 3/5] Implement NAT discover for go standalone proxies --- broker/broker.go | 2 +- common/messages/proxy.go | 30 +++++++++++++++++++----------- proxy/snowflake.go | 36 +++++++++++++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index d9ef111..2d3cd4b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -170,7 +170,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } - sid, proxyType, err := messages.DecodePollRequest(body) + sid, proxyType, _, err := messages.DecodePollRequest(body) if err != nil { w.WriteHeader(http.StatusBadRequest) return diff --git a/common/messages/proxy.go b/common/messages/proxy.go index 89dd43c..923189b 100644 --- a/common/messages/proxy.go +++ b/common/messages/proxy.go @@ -9,15 +9,16 @@ import ( "strings" ) -const version = "1.1" +const version = "1.2" -/* Version 1.1 specification: +/* Version 1.2 specification: == ProxyPollRequest == { Sid: [generated session id of proxy], - Version: 1.1, + Version: 1.2, Type: ["badge"|"webext"|"standalone"] + NAT: ["unknown"|"restricted"|"unrestricted"] } == ProxyPollResponse == @@ -44,7 +45,7 @@ HTTP 400 BadRequest == ProxyAnswerRequest == { Sid: [generated session id of proxy], - Version: 1.1, + Version: 1.2, Answer: { type: answer, @@ -76,37 +77,44 @@ type ProxyPollRequest struct { Sid string Version string Type string + NAT string } -func EncodePollRequest(sid string, proxyType string) ([]byte, error) { +func EncodePollRequest(sid string, proxyType string, natType string) ([]byte, error) { return json.Marshal(ProxyPollRequest{ Sid: sid, Version: version, Type: proxyType, + NAT: natType, }) } // Decodes a poll message from a snowflake proxy and returns the // sid and proxy type of the proxy on success and an error if it failed -func DecodePollRequest(data []byte) (string, string, error) { +func DecodePollRequest(data []byte) (string, string, string, error) { var message ProxyPollRequest err := json.Unmarshal(data, &message) if err != nil { - return "", "", err + return "", "", "", err } majorVersion := strings.Split(message.Version, ".")[0] if majorVersion != "1" { - return "", "", fmt.Errorf("using unknown version") + return "", "", "", fmt.Errorf("using unknown version") } // Version 1.x requires an Sid if message.Sid == "" { - return "", "", fmt.Errorf("no supplied session id") + return "", "", "", fmt.Errorf("no supplied session id") } - return message.Sid, message.Type, nil + natType := message.NAT + if natType == "" { + natType = "unknown" + } + + return message.Sid, message.Type, natType, nil } type ProxyPollResponse struct { @@ -159,7 +167,7 @@ type ProxyAnswerRequest struct { func EncodeAnswerRequest(answer string, sid string) ([]byte, error) { return json.Marshal(ProxyAnswerRequest{ - Version: "1.1", + Version: version, Sid: sid, Answer: answer, }) diff --git a/proxy/snowflake.go b/proxy/snowflake.go index 4877e6f..ac67748 100644 --- a/proxy/snowflake.go +++ b/proxy/snowflake.go @@ -19,6 +19,7 @@ import ( "time" "git.torproject.org/pluggable-transports/snowflake.git/common/messages" + "git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "git.torproject.org/pluggable-transports/snowflake.git/common/util" "git.torproject.org/pluggable-transports/snowflake.git/common/websocketconn" @@ -30,6 +31,11 @@ const defaultBrokerURL = "https://snowflake-broker.bamsoftware.com/" const defaultRelayURL = "wss://snowflake.bamsoftware.com/" const defaultSTUNURL = "stun:stun.l.google.com:19302" const pollInterval = 5 * time.Second +const ( + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" +) //amount of time after sending an SDP answer before the proxy assumes the //client is not going to connect @@ -40,6 +46,8 @@ const readLimit = 100000 //Maximum number of bytes to be read from an HTTP reque var broker *Broker var relayURL string +var currentNATType = NATUnknown + const ( sessionIDLength = 16 ) @@ -174,7 +182,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { timeOfNextPoll = now } - body, err := messages.EncodePollRequest(sid, "standalone") + body, err := messages.EncodePollRequest(sid, "standalone", currentNATType) if err != nil { log.Printf("Error encoding poll message: %s", err.Error()) return nil @@ -485,9 +493,35 @@ func main() { tokens <- true } + // determine NAT type before polling + updateNATType(config.ICEServers) + log.Printf("NAT type: %s", currentNATType) + for { getToken() sessionID := genSessionID() runSession(sessionID) } } + +// use provided STUN server(s) to determine NAT type +func updateNATType(servers []webrtc.ICEServer) { + + var restrictedNAT bool + var err error + for _, server := range servers { + addr := strings.TrimPrefix(server.URLs[0], "stun:") + restrictedNAT, err = nat.CheckIfRestrictedNAT(addr) + if err == nil { + if restrictedNAT { + currentNATType = NATRestricted + } else { + currentNATType = NATUnrestricted + } + break + } + } + if err != nil { + currentNATType = NATUnknown + } +} From aeb38e9b55e162f753078ae8a82a1de569ea2757 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Tue, 16 Jun 2020 17:49:39 -0400 Subject: [PATCH 4/5] Add a new heap at the broker for restricted flakes Now when proxies poll, they provide their NAT type to the broker. This introduces a new snowflake heap of just restricted snowflakes that the broker can pull from if the client has a known, unrestricted NAT. All other clients will pull from a heap of snowflakes with unrestricted or unknown NAT topologies. --- broker/broker.go | 65 +++++++++++++++++++++++++-------- broker/snowflake-broker_test.go | 14 +++---- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 2d3cd4b..939ebd3 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -31,12 +31,18 @@ const ( ClientTimeout = 10 ProxyTimeout = 10 readLimit = 100000 //Maximum number of bytes to be read from an HTTP request + + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" ) type BrokerContext struct { - snowflakes *SnowflakeHeap - // Map keeping track of snowflakeIDs required to match SDP answers from - // the second http POST. + snowflakes *SnowflakeHeap + restrictedSnowflakes *SnowflakeHeap + // Maps keeping track of snowflakeIDs required to match SDP answers from + // the second http POST. Restricted snowflakes can only be matched up with + // clients behind an unrestricted NAT. idToSnowflake map[string]*Snowflake // Synchronization for the snowflake map and heap snowflakeLock sync.Mutex @@ -47,6 +53,8 @@ type BrokerContext struct { func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { snowflakes := new(SnowflakeHeap) heap.Init(snowflakes) + rSnowflakes := new(SnowflakeHeap) + heap.Init(rSnowflakes) metrics, err := NewMetrics(metricsLogger) if err != nil { @@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { } return &BrokerContext{ - snowflakes: snowflakes, - idToSnowflake: make(map[string]*Snowflake), - proxyPolls: make(chan *ProxyPoll), - metrics: metrics, + snowflakes: snowflakes, + restrictedSnowflakes: rSnowflakes, + idToSnowflake: make(map[string]*Snowflake), + proxyPolls: make(chan *ProxyPoll), + metrics: metrics, } } @@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type ProxyPoll struct { id string proxyType string + natType string offerChannel chan []byte } // Registers a Snowflake and waits for some Client to send an offer, // as part of the polling logic of the proxy handler. -func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte { +func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte { request := new(ProxyPoll) request.id = id request.proxyType = proxyType + request.natType = natType request.offerChannel = make(chan []byte) ctx.proxyPolls <- request // Block until an offer is available, or timeout which sends a nil offer. @@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte { // client offer or nil on timeout / none are available. func (ctx *BrokerContext) Broker() { for request := range ctx.proxyPolls { - snowflake := ctx.AddSnowflake(request.id, request.proxyType) + snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType) // Wait for a client to avail an offer to the snowflake. go func(request *ProxyPoll) { select { @@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() { ctx.snowflakeLock.Lock() defer ctx.snowflakeLock.Unlock() if snowflake.index != -1 { - heap.Remove(ctx.snowflakes, snowflake.index) + if request.natType == NATRestricted { + heap.Remove(ctx.restrictedSnowflakes, snowflake.index) + } else { + heap.Remove(ctx.snowflakes, snowflake.index) + } delete(ctx.idToSnowflake, snowflake.id) close(request.offerChannel) } @@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() { // Create and add a Snowflake to the heap. // Required to keep track of proxies between providing them // with an offer and awaiting their second POST with an answer. -func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { +func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake { snowflake := new(Snowflake) snowflake.id = id snowflake.clients = 0 @@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { snowflake.offerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte) ctx.snowflakeLock.Lock() - heap.Push(ctx.snowflakes, snowflake) + if natType == NATRestricted { + heap.Push(ctx.restrictedSnowflakes, snowflake) + } else { + heap.Push(ctx.snowflakes, snowflake) + } ctx.snowflakeLock.Unlock() ctx.idToSnowflake[id] = snowflake return snowflake @@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } - sid, proxyType, _, err := messages.DecodePollRequest(body) + sid, proxyType, natType, err := messages.DecodePollRequest(body) if err != nil { w.WriteHeader(http.StatusBadRequest) return @@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { } // Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(sid, proxyType) + offer := ctx.RequestOffer(sid, proxyType, natType) var b []byte if nil == offer { ctx.metrics.lock.Lock() @@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + + natType := r.Header.Get("X-NAT-TYPE") + if natType == "" { + natType = NATUnknown + } + + // Only hand out known restricted snowflakes to unrestricted clients + var snowflakeHeap *SnowflakeHeap + if natType == NATUnrestricted { + snowflakeHeap = ctx.restrictedSnowflakes + } else { + snowflakeHeap = ctx.snowflakes + } + // Immediately fail if there are no snowflakes available. ctx.snowflakeLock.Lock() - numSnowflakes := ctx.snowflakes.Len() + numSnowflakes := snowflakeHeap.Len() ctx.snowflakeLock.Unlock() if numSnowflakes <= 0 { ctx.metrics.lock.Lock() @@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { // Otherwise, find the most available snowflake proxy, and pass the offer to it. // Delete must be deferred in order to correctly process answer request later. ctx.snowflakeLock.Lock() - snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) + snowflake := heap.Pop(snowflakeHeap).(*Snowflake) ctx.snowflakeLock.Unlock() snowflake.offerChannel <- offer diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 18b83dd..91383a1 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -29,7 +29,7 @@ func TestBroker(t *testing.T) { Convey("Adds Snowflake", func() { So(ctx.snowflakes.Len(), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0) - ctx.AddSnowflake("foo", "") + ctx.AddSnowflake("foo", "", NATUnknown) So(ctx.snowflakes.Len(), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1) }) @@ -55,7 +55,7 @@ func TestBroker(t *testing.T) { Convey("Request an offer from the Snowflake Heap", func() { done := make(chan []byte) go func() { - offer := ctx.RequestOffer("test", "") + offer := ctx.RequestOffer("test", "", NATUnknown) done <- offer }() request := <-ctx.proxyPolls @@ -79,7 +79,7 @@ func TestBroker(t *testing.T) { Convey("with a proxy answer if available.", func() { done := make(chan bool) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) done <- true @@ -97,7 +97,7 @@ func TestBroker(t *testing.T) { return } done := make(chan bool) - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) // Takes a few seconds here... @@ -147,7 +147,7 @@ func TestBroker(t *testing.T) { }) Convey("Responds to proxy answers...", func() { - s := ctx.AddSnowflake("test", "") + s := ctx.AddSnowflake("test", "", NATUnknown) w := httptest.NewRecorder() data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) @@ -260,7 +260,7 @@ func TestBroker(t *testing.T) { // Manually do the Broker goroutine action here for full control. p := <-ctx.proxyPolls So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - s := ctx.AddSnowflake(p.id, "") + s := ctx.AddSnowflake(p.id, "", NATUnknown) go func() { offer := <-s.offerChannel p.offerChannel <- offer @@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) { So(err, ShouldBeNil) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) done <- true From cc8a56fd503031e29e94d846f34a60dc8b56e56b Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 17 Jun 2020 10:37:18 -0400 Subject: [PATCH 5/5] Fix unittests for common/messages library These unittests need to be updated for the new Version 1.2 messaging specification. --- common/messages/proxy_test.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/common/messages/proxy_test.go b/common/messages/proxy_test.go index 1570d4f..3aa67fb 100644 --- a/common/messages/proxy_test.go +++ b/common/messages/proxy_test.go @@ -13,6 +13,7 @@ func TestDecodeProxyPollRequest(t *testing.T) { for _, test := range []struct { sid string proxyType string + natType string data string err error }{ @@ -20,6 +21,7 @@ func TestDecodeProxyPollRequest(t *testing.T) { //Version 1.0 proxy message "ymbcCMto7KHNGYlp", "", + "unknown", `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`, nil, }, @@ -27,44 +29,59 @@ func TestDecodeProxyPollRequest(t *testing.T) { //Version 1.1 proxy message "ymbcCMto7KHNGYlp", "standalone", + "unknown", `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.1","Type":"standalone"}`, nil, }, + { + //Version 1.2 proxy message + "ymbcCMto7KHNGYlp", + "standalone", + "restricted", + `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"standalone", "NAT":"restricted"}`, + nil, + }, { //Version 0.X proxy message: "", "", - "ymbcCMto7KHNGYlp", + "", + "", &json.SyntaxError{}, }, { + "", "", "", `{"Sid":"ymbcCMto7KHNGYlp"}`, fmt.Errorf(""), }, { + "", "", "", "{}", fmt.Errorf(""), }, { + "", "", "", `{"Version":"1.0"}`, fmt.Errorf(""), }, { + "", "", "", `{"Version":"2.0"}`, fmt.Errorf(""), }, } { - sid, proxyType, err := DecodePollRequest([]byte(test.data)) + sid, proxyType, natType, err := DecodePollRequest([]byte(test.data)) So(sid, ShouldResemble, test.sid) So(proxyType, ShouldResemble, test.proxyType) + So(natType, ShouldResemble, test.natType) So(err, ShouldHaveSameTypeAs, test.err) } @@ -73,11 +90,12 @@ func TestDecodeProxyPollRequest(t *testing.T) { func TestEncodeProxyPollRequests(t *testing.T) { Convey("Context", t, func() { - b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone") + b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone", "unknown") So(err, ShouldEqual, nil) - sid, proxyType, err := DecodePollRequest(b) + sid, proxyType, natType, err := DecodePollRequest(b) So(sid, ShouldEqual, "ymbcCMto7KHNGYlp") So(proxyType, ShouldEqual, "standalone") + So(natType, ShouldEqual, "unknown") So(err, ShouldEqual, nil) }) }