diff --git a/codec/gcm.go b/codec/gcm.go index 5c5bde9..0c1b12b 100644 --- a/codec/gcm.go +++ b/codec/gcm.go @@ -12,22 +12,29 @@ import ( "github.com/renproject/id" ) +// gcmNone is the nonce used by the gcm encoder to encrypt/decrypt data +// It is represented as a struct with a 32 bit uint and a 64 bit uint, +// together representing the top 32 bits and bottom 64 bits of a 96 bit +// unsigned integer. +// The struct also contains a boolean countDown that represents whether +// the nonce counts up from 0 or counts down from 7.9228163e+28 type gcmNonce struct { - // top and bottom together represent the top 32 bits and bottom 64 bits of a 96 bit unsigned integer + // top uint32 bottom uint64 countDown bool } +// next is a method on gcmNonce that counts up or down depending on the countDown flag func (nonce gcmNonce) next() { if nonce.countDown { nonce.pred() } else { nonce.succ() } - } +// succ is a method in gcmNonce that represents a successor function func (nonce gcmNonce) succ() { nonce.bottom++ // If bottom overflows, increment top by 1 @@ -36,6 +43,7 @@ func (nonce gcmNonce) succ() { } } +// pred is a method on gcmNonce that represents a predecessor function func (nonce gcmNonce) pred() { nonce.bottom-- // If bottom underflows, decrement top by 1 diff --git a/dht/table.go b/dht/table.go index b55c0f3..5c0b912 100644 --- a/dht/table.go +++ b/dht/table.go @@ -109,7 +109,7 @@ func (table *InMemTable) AddPeer(peerID id.Signatory, peerAddr wire.Address) { table.addrsBySignatory[peerID] = peerAddr // Insert into the sorted signatories list based on its XOR distance from our - // own address. + // own address. Signatory is added only if it's not present in the sorted list if !ok { i := sort.Search(len(table.sorted), func(i int) bool { return table.isCloser(peerID, table.sorted[i]) @@ -179,6 +179,9 @@ func (table *InMemTable) RandomPeers(n int) []id.Signatory { // unsurprising alternative. return []id.Signatory{} } + + // If sorted list of signatories is smaller than required number `n`, + // return a copy of the list as is if n >= m { sigs := make([]id.Signatory, m) table.sortedMu.Lock() diff --git a/handshake/once.go b/handshake/once.go index 7670cac..f2a019b 100644 --- a/handshake/once.go +++ b/handshake/once.go @@ -49,6 +49,10 @@ func NewOncePool(opts OncePoolOptions) OncePool { } } +// Once returns a handshake that only maintains one connection of a particular signatory +// at any given time. If a second connection is attempted, the peer with the larger signatory +// value decides to persist/kill the new connection based on whether the previous connection +// has expired its minimum expiry age func Once(self id.Signatory, pool *OncePool, h Handshake) Handshake { return func(conn net.Conn, enc codec.Encoder, dec codec.Decoder) (codec.Encoder, codec.Decoder, id.Signatory, error) { enc, dec, remote, err := h(conn, enc, dec) diff --git a/peer/gossip.go b/peer/gossip.go index 178ae6e..c2b18c5 100644 --- a/peer/gossip.go +++ b/peer/gossip.go @@ -41,6 +41,7 @@ func NewGossiper(opts GossiperOptions, filter *channel.SyncFilter, transport *tr } } +// Resolve accepts a resolver and associates it with the gossiper func (g *Gossiper) Resolve(resolver dht.ContentResolver) { g.resolverMu.Lock() defer g.resolverMu.Unlock() @@ -48,6 +49,7 @@ func (g *Gossiper) Resolve(resolver dht.ContentResolver) { g.resolver = resolver } + func (g *Gossiper) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) { if subnet == nil { subnet = &DefaultSubnet @@ -72,6 +74,8 @@ func (g *Gossiper) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash } } +// DidReceiveMessage accepts a signatory (sender's ID) and a message. It is a +// callback method that is used to check the type of message and invoke relevant methods. func (g *Gossiper) DidReceiveMessage(from id.Signatory, msg wire.Msg) error { switch msg.Type { case wire.MsgTypePush: @@ -88,6 +92,10 @@ func (g *Gossiper) DidReceiveMessage(from id.Signatory, msg wire.Msg) error { return nil } +// didReceivePush accepts a signatory (sender's ID) and a message. It is a +// callback method that is used to a an initial gossiped message. If the content is +// is relevant, it sends a pull request back to the sender and waits for a sync message +// within a certain time period. func (g *Gossiper) didReceivePush(from id.Signatory, msg wire.Msg) { if len(msg.Data) == 0 { return @@ -145,6 +153,10 @@ func (g *Gossiper) didReceivePush(from id.Signatory, msg wire.Msg) { } } +// didReceivePull accepts a signatory (sender's ID) and a message. It is a +// callback method that is used to respond to a pull request. If the content is +// is present in associated ContentResolver, it sends a sync request peer that sent the pull request +// with the relevant information (content). func (g *Gossiper) didReceivePull(from id.Signatory, msg wire.Msg) { if len(msg.Data) == 0 { return @@ -181,6 +193,10 @@ func (g *Gossiper) didReceivePull(from id.Signatory, msg wire.Msg) { return } +// didReceiveSync accepts a signatory (sender's ID) and a message. It is a +// callback method that is used to accept a sync message. A sync message is only accepted +// if the filter is open for the particular contentID. If accepted, the content is inserted +// into the associated ContentResolver func (g *Gossiper) didReceiveSync(from id.Signatory, msg wire.Msg) { g.resolverMu.RLock() if g.resolver == nil { diff --git a/peer/peer.go b/peer/peer.go index 31b9440..3adc1d1 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -19,6 +19,8 @@ var ( DefaultTimeout = time.Second ) +// ErrPeerNotFound represents the error raised when a designated recipient of a message +// is not present in the peer table var ( ErrPeerNotFound = errors.New("peer not found") ) @@ -42,50 +44,79 @@ func New(opts Options, transport *transport.Transport) *Peer { } } +// ID return the signatory value associated with the peer func (p *Peer) ID() id.Signatory { return p.opts.PrivKey.Signatory() } +// Syncer method return a pointer to the syncer struct associated with the peer func (p *Peer) Syncer() *Syncer { return p.syncer } +// Gossiper method return a pointer to the gossiper struct associated with the peer func (p *Peer) Gossiper() *Gossiper { return p.gossiper } +// Transport returns a pointer to the transport layer associated with the peer func (p *Peer) Transport() *transport.Transport { return p.transport } +// Link accepts a signatory registers a peer as a trusted individual for +// persistent connections func (p *Peer) Link(remote id.Signatory) { p.transport.Link(remote) } +// Link accepts a signatory and de-registers a peer as a trusted individual +// for persistent connections func (p *Peer) Unlink(remote id.Signatory) { p.transport.Unlink(remote) } -func (p *Peer) Ping(ctx context.Context) error { +// Ping accepts a context and a signatory and is used to send a ping message to a peer. +// It can be used to check liveness of a peer +func (p *Peer) Ping(ctx context.Context, to id.Signatory) error { return fmt.Errorf("unimplemented") } +// Send accept a context, the signatory ID of a recipient peer, and a message. It tries to send +// the message to the recipient peer using the transport layer until the context expires func (p *Peer) Send(ctx context.Context, to id.Signatory, msg wire.Msg) error { return p.transport.Send(ctx, to, msg) } +// Sync accepts a context, an array of bytes (contentID) and a pointer to a signatory (hint). +// A call to Sync is used to request content with a particular contentID from peers. +// A hint can be supplied to try syncing from a particular peer as well. +// A nil hint will indicate that the content can be attempted to be synced any alpha random peers, +// where alpha is defined in SyncerOptions struct func (p *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) { return p.syncer.Sync(ctx, contentID, hint) } +// Gossip accepts a context, an array of bytes (contentID) and a pointer to a id.Hash (subnet) +// A call to Gossip is used to gossip information (content) with a particular contentID with peers. +// A subnet can be supplied to try gossip with a particular subnet of peers. +// If the supplied subnet id DefaultSubnet or nil, content will be gossiped with alpha random peers, +// where alpha is defined in GossiperOptions struct. func (p *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) { p.gossiper.Gossip(ctx, contentID, subnet) } +// DiscoverPeers accepts a context and is used to start discovering new peers using the +// peer discovery client. DiscoverPeers is a blocking method, ideally it would be called +// as `go p.DiscoverPeers(...)` func (p *Peer) DiscoverPeers(ctx context.Context) { p.discoveryClient.DiscoverPeers(ctx) } +// Run accepts a context and sets up the default receive function the features callbacks +// for syncer, gossiper and peer discovery. Finally, it calls the Run method on the transport +// layer, which then starts to listen for and accept connections. +// A call to Run is a blocking method, ideally it would be called as `go p.Run(...)` func (p *Peer) Run(ctx context.Context) { p.transport.Receive(ctx, func(from id.Signatory, packet wire.Packet) error { // TODO(ross): Think about merging the syncer and the gossiper. @@ -103,10 +134,16 @@ func (p *Peer) Run(ctx context.Context) { p.transport.Run(ctx) } +// Receive takes a context and a custom receive handler. It is used to execute custom +// logic on messages. Every message, along with the ID (signatory) of the sender is +// passed to this function in a sequential manner. func (p *Peer) Receive(ctx context.Context, f func(id.Signatory,wire.Packet) error) { p.transport.Receive(ctx, f) } +// Resolve takes in a context and a ContentResolver and associates it with +// the gossiper. Gossiper / Sync logic will not function until a valid +// ContentResolver is provided to a call to Resolve func (p *Peer) Resolve(ctx context.Context, contentResolver dht.ContentResolver) { p.gossiper.Resolve(contentResolver) }