diff --git a/toversok/actors/a_conn.go b/toversok/actors/a_conn.go index e92cb2e..1d3bad4 100644 --- a/toversok/actors/a_conn.go +++ b/toversok/actors/a_conn.go @@ -275,13 +275,15 @@ func (ic *InConn) Ctx() context.Context { // This prevents routers from blocking when the conn is shutting down, // or if its blocked otherwise. func (ic *InConn) ForwardPacket(pkt []byte) { - select { - case ic.pktCh <- pkt: - default: - // TODO maybe convert dropping to timeout? - // making lots of timers would be costly though - // TODO log? metric? - } + //select { + //case ic.pktCh <- pkt: + //default: + // // TODO maybe convert dropping to timeout? + // // making lots of timers would be costly though + // // TODO log? metric? + //} + + ic.pktCh <- pkt } // Bump the activity timer. diff --git a/toversok/actors/a_sockrecv.go b/toversok/actors/a_sockrecv.go index 374da4a..3f4eb33 100644 --- a/toversok/actors/a_sockrecv.go +++ b/toversok/actors/a_sockrecv.go @@ -1,13 +1,13 @@ package actors import ( + "bytes" "context" "errors" "log/slog" "net" "net/netip" "runtime/debug" - "slices" "time" "github.com/edup2p/common/types" @@ -52,7 +52,9 @@ func (r *SockRecv) Run() { } }() - buf := make([]byte, 1<<16) + const MaxBuf = 1 << 16 + + var buf = [MaxBuf]byte{0} for { if r.ctx.Err() != nil { @@ -65,7 +67,7 @@ func (r *SockRecv) Run() { return } - n, ap, err := r.Conn.ReadFromUDPAddrPort(buf) + n, ap, err := r.Conn.ReadFromUDPAddrPort(buf[:]) ts := time.Now() @@ -92,7 +94,7 @@ func (r *SockRecv) Run() { continue } - pkt := slices.Clone(buf[:n]) + pkt := bytes.Clone(buf[:n]) if r.ctx.Err() != nil { return diff --git a/usrwg/channel_conn.go b/usrwg/channel_conn.go index 89581df..5d3259a 100644 --- a/usrwg/channel_conn.go +++ b/usrwg/channel_conn.go @@ -43,15 +43,19 @@ func (cc *ChannelConn) SetReadDeadline(t time.Time) error { func (cc *ChannelConn) ReadFromUDPAddrPort(b []byte) (n int, addr netip.AddrPort, err error) { var val []byte - if cc.currentReadDeadline == (time.Time{}) { - // Block until value received. - val = <-cc.incoming - } else { - // Block until value or timeout. - select { - case val = <-cc.incoming: - case <-time.After(time.Until(cc.currentReadDeadline)): - return 0, netip.AddrPort{}, context.DeadlineExceeded + select { + case val = <-cc.incoming: + default: + if cc.currentReadDeadline == (time.Time{}) { + // Block until value received. + val = <-cc.incoming + } else { + // Block until value or timeout. + select { + case val = <-cc.incoming: + case <-time.After(time.Until(cc.currentReadDeadline)): + return 0, netip.AddrPort{}, context.DeadlineExceeded + } } } @@ -110,8 +114,13 @@ func (cc *ChannelConn) putIn(pkt []byte, d time.Duration) (ok bool) { select { case cc.incoming <- pkt: ok = true - case <-time.After(d): - ok = false + default: + select { + case cc.incoming <- pkt: + ok = true + case <-time.After(d): + ok = false + } } return