-
Notifications
You must be signed in to change notification settings - Fork 189
Add WriteBatch for batch packet sending #853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JoTurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
| type candidateBase struct { | ||
| id string | ||
| networkType NetworkType | ||
| candidateType CandidateType | ||
|
|
||
| component uint16 | ||
| address string | ||
| port int | ||
| relatedAddress *CandidateRelatedAddress | ||
| tcpType TCPType | ||
|
|
||
| resolvedAddr net.Addr | ||
|
|
||
| lastSent atomic.Int64 | ||
| lastReceived atomic.Int64 | ||
| conn net.PacketConn | ||
| ipv4Conn *ipv4.PacketConn | ||
| ipv6Conn *ipv6.PacketConn | ||
|
|
||
| currAgent *Agent | ||
| closeCh chan struct{} | ||
| closedCh chan struct{} | ||
|
|
||
| foundationOverride string | ||
| priorityOverride uint32 | ||
|
|
||
| remoteCandidateCaches map[AddrPort]Candidate | ||
| isLocationTracked bool | ||
| extensions []CandidateExtension | ||
| } | ||
|
|
||
| // Save a time reference to calculate monotonic time for candidate last sent/received. | ||
| // nolint: gochecknoglobals | ||
| var timeRef = time.Now() | ||
|
|
||
| // getMonoNanos returns the monotonic nanoseconds of a time t since timeRef. | ||
| func getMonoNanos(t time.Time) int64 { | ||
| return t.Sub(timeRef).Nanoseconds() | ||
| } | ||
|
|
||
| // getMonoTime returns a time.Time based on monotonic nanos since timeRef. | ||
| func getMonoTime(nanos int64) time.Time { | ||
| return timeRef.Add(time.Duration(nanos)) | ||
| } | ||
|
|
||
| // Done implements context.Context. | ||
| func (c *candidateBase) Done() <-chan struct{} { | ||
| return c.closeCh | ||
| } | ||
|
|
||
| // Err implements context.Context. | ||
| func (c *candidateBase) Err() error { | ||
| select { | ||
| case <-c.closedCh: | ||
| return ErrRunCanceled | ||
| default: | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // Deadline implements context.Context. | ||
| func (c *candidateBase) Deadline() (deadline time.Time, ok bool) { | ||
| return time.Time{}, false | ||
| } | ||
|
|
||
| // Value implements context.Context. | ||
| func (c *candidateBase) Value(any) any { | ||
| return nil | ||
| } | ||
|
|
||
| // ID returns Candidate ID. | ||
| func (c *candidateBase) ID() string { | ||
| return c.id | ||
| } | ||
|
|
||
| func (c *candidateBase) Foundation() string { | ||
| if c.foundationOverride != "" { | ||
| return c.foundationOverride | ||
| } | ||
|
|
||
| return fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(c.Type().String()+c.address+c.networkType.String()))) | ||
| } | ||
|
|
||
| // Address returns Candidate Address. | ||
| func (c *candidateBase) Address() string { | ||
| return c.address | ||
| } | ||
|
|
||
| // Port returns Candidate Port. | ||
| func (c *candidateBase) Port() int { | ||
| return c.port | ||
| } | ||
|
|
||
| // Type returns candidate type. | ||
| func (c *candidateBase) Type() CandidateType { | ||
| return c.candidateType | ||
| } | ||
|
|
||
| // NetworkType returns candidate NetworkType. | ||
| func (c *candidateBase) NetworkType() NetworkType { | ||
| return c.networkType | ||
| } | ||
|
|
||
| // Component returns candidate component. | ||
| func (c *candidateBase) Component() uint16 { | ||
| return c.component | ||
| } | ||
|
|
||
| func (c *candidateBase) SetComponent(component uint16) { | ||
| c.component = component | ||
| } | ||
|
|
||
| // LocalPreference returns the local preference for this candidate. | ||
| func (c *candidateBase) LocalPreference() uint16 { //nolint:cyclop | ||
| if c.NetworkType().IsTCP() { | ||
| // RFC 6544, section 4.2 | ||
| // | ||
| // In Section 4.1.2.1 of [RFC5245], a recommended formula for UDP ICE | ||
| // candidate prioritization is defined. For TCP candidates, the same | ||
| // formula and candidate type preferences SHOULD be used, and the | ||
| // RECOMMENDED type preferences for the new candidate types defined in | ||
| // this document (see Section 5) are 105 for NAT-assisted candidates and | ||
| // 75 for UDP-tunneled candidates. | ||
| // | ||
| // (...) | ||
| // | ||
| // With TCP candidates, the local preference part of the recommended | ||
| // priority formula is updated to also include the directionality | ||
| // (active, passive, or simultaneous-open) of the TCP connection. The | ||
| // RECOMMENDED local preference is then defined as: | ||
| // | ||
| // local preference = (2^13) * direction-pref + other-pref | ||
| // | ||
| // The direction-pref MUST be between 0 and 7 (both inclusive), with 7 | ||
| // being the most preferred. The other-pref MUST be between 0 and 8191 | ||
| // (both inclusive), with 8191 being the most preferred. It is | ||
| // RECOMMENDED that the host, UDP-tunneled, and relayed TCP candidates | ||
| // have the direction-pref assigned as follows: 6 for active, 4 for | ||
| // passive, and 2 for S-O. For the NAT-assisted and server reflexive | ||
| // candidates, the RECOMMENDED values are: 6 for S-O, 4 for active, and | ||
| // 2 for passive. | ||
| // | ||
| // (...) | ||
| // | ||
| // If any two candidates have the same type-preference and direction- | ||
| // pref, they MUST have a unique other-pref. With this specification, | ||
| // this usually only happens with multi-homed hosts, in which case | ||
| // other-pref is the preference for the particular IP address from which | ||
| // the candidate was obtained. When there is only a single IP address, | ||
| // this value SHOULD be set to the maximum allowed value (8191). | ||
| var otherPref uint16 = 8191 | ||
|
|
||
| directionPref := func() uint16 { | ||
| switch c.Type() { | ||
| case CandidateTypeHost, CandidateTypeRelay: | ||
| switch c.tcpType { | ||
| case TCPTypeActive: | ||
| return 6 | ||
| case TCPTypePassive: | ||
| return 4 | ||
| case TCPTypeSimultaneousOpen: | ||
| return 2 | ||
| case TCPTypeUnspecified: | ||
| return 0 | ||
| } | ||
| case CandidateTypePeerReflexive, CandidateTypeServerReflexive: | ||
| switch c.tcpType { | ||
| case TCPTypeSimultaneousOpen: | ||
| return 6 | ||
| case TCPTypeActive: | ||
| return 4 | ||
| case TCPTypePassive: | ||
| return 2 | ||
| case TCPTypeUnspecified: | ||
| return 0 | ||
| } | ||
| case CandidateTypeUnspecified: | ||
| return 0 | ||
| } | ||
|
|
||
| return 0 | ||
| }() | ||
|
|
||
| return (1<<13)*directionPref + otherPref | ||
| } | ||
|
|
||
| return defaultLocalPreference | ||
| } | ||
|
|
||
| // RelatedAddress returns *CandidateRelatedAddress. | ||
| func (c *candidateBase) RelatedAddress() *CandidateRelatedAddress { | ||
| return c.relatedAddress | ||
| } | ||
|
|
||
| func (c *candidateBase) TCPType() TCPType { | ||
| return c.tcpType | ||
| } | ||
|
|
||
| // start runs the candidate using the provided connection. | ||
| func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-chan struct{}) { | ||
| if c.conn != nil { | ||
| c.agent().log.Warn("Can't start already started candidateBase") | ||
|
|
||
| return | ||
| } | ||
| c.currAgent = a | ||
| c.conn = conn | ||
| c.closeCh = make(chan struct{}) | ||
| c.closedCh = make(chan struct{}) | ||
|
|
||
| if c.networkType.IsIPv6() { | ||
| c.ipv6Conn = ipv6.NewPacketConn(conn) | ||
| } else { | ||
| c.ipv4Conn = ipv4.NewPacketConn(conn) | ||
| } | ||
|
|
||
| go c.recvLoop(initializedCh) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, did you see my response to you in the discord about why the batching implemented in #608 wasn't merged yet [1]?
pion/ice isn't the correct layer to add this, and we have implementation for batching packet conn in pion transport, that can be used today with a mux, Maybe we can add it to pion webrtc and make it optional?, Maybe we can make pion switch to batching when it detects high throughput?
you can rebase @cnderrauber change from #608 but please look at the conversation in that PR and why we got push back when we tried to add batching by default.
https://discord.com/channels/1352636971591274548/1352636972614680659/1450454270305636382
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @joeturki, I may have jumped the gun here after missing the discord message.
I took a minute to review the discussion in #608, and believe that my motivation maybe a bit different. Full disclosure, I don't have the architectural depth of pion yet, so please don't hesitate to educate me.
My understanding of #608 is that batching can improve sending packets over multiple connections (udp mux) to multiple peers. I also heard people being concerned about the buffers and fixed time intervals causing extra latency.
In my use case I am mostly concerned about two peers and reducing latency to the max. Here, I'd like to give the user the choice to batch. In my wishful thinking, datachannels could accept multiple messages in the send method. For our application, we actually have a fixed time interval at the high level and call send many many times. So we know when we want to batch or not.
Very open to other ideas. I think #608 wouldn't solve our use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello.
UDPMux is still typically one underlying socket multiplexing many remote addresses/ufrags, not multiple connections. in #608 there is a WriteBatchInterval to configure intervals.
either ways #608 does multiple things. not just batching, and it can be configured for many uses. but it centralizes batching policy inside the mux rather than at the call site. ICE shouldn't be super aware that we do batching or not. We should abstract it in pion/transport like how #608 did.
I think once this is cleaned we can get it merged.
|
@wrangelvid would you like an invite to the org? If you're planning to contribute more changes, I see that you're interested at many things. this will give you direct access to work on branches, run the CI. |
I really appreciate the warm welcome! Being able to run CI would be sweet and our organization is betting on pion so much that I'd like to offer support where we can. Though, I would feel better earning the seat after at least one contribution. |
Description
This adds a
WriteBatchfunction to the candidatepair that usesWriteBatchipv4/ipv6. Under the hood, on linux systems this leveragessendmmsgto send multiple packets at ones, reducing CPU cost.Note: on other platforms, we fall back to looping through the packets as
WriteBatchwill only send a single packet at a time.Reference issue
Closes #128
FYI
This is my first public PR to the open Go community. Please do not spare me in the review. I'd appreciate thorough feedback and am open to learning!