1818package com .velocitypowered .proxy .network ;
1919
2020import com .google .common .base .Preconditions ;
21+ import com .google .common .collect .HashMultimap ;
22+ import com .google .common .collect .Multimap ;
2123import com .velocitypowered .api .event .proxy .ListenerBoundEvent ;
2224import com .velocitypowered .api .event .proxy .ListenerCloseEvent ;
2325import com .velocitypowered .api .network .ListenerType ;
2830import io .netty .bootstrap .Bootstrap ;
2931import io .netty .bootstrap .ServerBootstrap ;
3032import io .netty .channel .Channel ;
33+ import io .netty .channel .ChannelFuture ;
3134import io .netty .channel .ChannelFutureListener ;
3235import io .netty .channel .ChannelOption ;
3336import io .netty .channel .EventLoopGroup ;
3437import io .netty .channel .WriteBufferWaterMark ;
38+ import io .netty .channel .unix .UnixChannelOption ;
3539import io .netty .util .concurrent .GlobalEventExecutor ;
40+ import io .netty .util .concurrent .MultithreadEventExecutorGroup ;
3641import java .net .InetSocketAddress ;
3742import java .net .http .HttpClient ;
38- import java .util .HashMap ;
43+ import java .util .Collection ;
3944import java .util .Map ;
4045import org .apache .logging .log4j .LogManager ;
4146import org .apache .logging .log4j .Logger ;
@@ -50,7 +55,7 @@ public final class ConnectionManager {
5055 private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark (1 << 20 ,
5156 1 << 21 );
5257 private static final Logger LOGGER = LogManager .getLogger (ConnectionManager .class , new ParameterizedMessageFactory ());
53- private final Map <InetSocketAddress , Endpoint > endpoints = new HashMap <> ();
58+ private final Multimap <InetSocketAddress , Endpoint > endpoints = HashMultimap . create ();
5459 private final TransportType transportType ;
5560 private final EventLoopGroup bossGroup ;
5661 private final EventLoopGroup workerGroup ;
@@ -92,7 +97,6 @@ public void logChannelInformation() {
9297 public void bind (final InetSocketAddress address ) {
9398 final ServerBootstrap bootstrap = new ServerBootstrap ()
9499 .channelFactory (this .transportType .serverSocketChannelFactory )
95- .group (this .bossGroup , this .workerGroup )
96100 .childOption (ChannelOption .WRITE_BUFFER_WATER_MARK , SERVER_WRITE_MARK )
97101 .childHandler (this .serverChannelInitializer .get ())
98102 .childOption (ChannelOption .TCP_NODELAY , true )
@@ -103,26 +107,50 @@ public void bind(final InetSocketAddress address) {
103107 bootstrap .option (ChannelOption .TCP_FASTOPEN , 3 );
104108 }
105109
106- bootstrap .bind ()
107- .addListener ((ChannelFutureListener ) future -> {
108- final Channel channel = future .channel ();
109- if (future .isSuccess ()) {
110- this .endpoints .put (address , new Endpoint (channel , ListenerType .MINECRAFT ));
111-
112- // Warn people with console access that HAProxy is in use, see PR: #1436
113- if (this .server .getConfiguration ().isProxyProtocol ()) {
114- LOGGER .warn ("Using HAProxy and listening on {}, please ensure this listener is adequately firewalled." , channel .localAddress ());
115- }
110+ if (server .getConfiguration ().isEnableReusePort ()) {
111+ // We don't need a boss group, since each worker will bind to the socket
112+ bootstrap .option (UnixChannelOption .SO_REUSEPORT , true )
113+ .group (this .workerGroup );
114+ } else {
115+ bootstrap .group (this .bossGroup , this .workerGroup );
116+ }
116117
117- LOGGER .info ("Listening on {}" , channel .localAddress ());
118+ final int binds = server .getConfiguration ().isEnableReusePort ()
119+ ? ((MultithreadEventExecutorGroup ) this .workerGroup ).executorCount () : 1 ;
118120
119- // Fire the proxy bound event after the socket is bound
120- server .getEventManager ().fireAndForget (
121- new ListenerBoundEvent (address , ListenerType .MINECRAFT ));
122- } else {
123- LOGGER .error ("Can't bind to {}" , address , future .cause ());
124- }
125- });
121+ for (int bind = 0 ; bind < binds ; bind ++) {
122+ // Wait for each bind to open. If we encounter any errors, don't try to bind again.
123+ int finalBind = bind ;
124+ ChannelFuture f = bootstrap .bind ()
125+ .addListener ((ChannelFutureListener ) future -> {
126+ final Channel channel = future .channel ();
127+ if (future .isSuccess ()) {
128+ this .endpoints .put (address , new Endpoint (channel , ListenerType .MINECRAFT ));
129+
130+ LOGGER .info ("Listening on {}" , channel .localAddress ());
131+
132+ if (finalBind == 0 ) {
133+ // Warn people with console access that HAProxy is in use, see PR: #1436
134+ if (this .server .getConfiguration ().isProxyProtocol ()) {
135+ LOGGER .warn (
136+ "Using HAProxy and listening on {}, please ensure this listener is adequately firewalled." ,
137+ channel .localAddress ());
138+ }
139+
140+ // Fire the proxy bound event after the socket is bound
141+ server .getEventManager ().fireAndForget (
142+ new ListenerBoundEvent (address , ListenerType .MINECRAFT ));
143+ }
144+ } else {
145+ LOGGER .error ("Can't bind to {}" , address , future .cause ());
146+ }
147+ });
148+ f .syncUninterruptibly ();
149+
150+ if (!f .isSuccess ()) {
151+ break ;
152+ }
153+ }
126154 }
127155
128156 /**
@@ -180,17 +208,20 @@ public Bootstrap createWorker(@Nullable final EventLoopGroup group) {
180208 * @param oldBind the endpoint to close
181209 */
182210 public void close (final InetSocketAddress oldBind ) {
183- Endpoint endpoint = endpoints .remove (oldBind );
211+ Collection <Endpoint > endpoints = this .endpoints .removeAll (oldBind );
212+ Preconditions .checkState (!endpoints .isEmpty (), "Endpoint was not registered" );
213+
214+ ListenerType type = endpoints .iterator ().next ().type ();
184215
185216 // Fire proxy close event to notify plugins of socket close. We block since plugins
186217 // should have a chance to be notified before the server stops accepting connections.
187- server .getEventManager ().fire (new ListenerCloseEvent (oldBind , endpoint .type ())).join ();
188-
189- Channel serverChannel = endpoint .channel ();
218+ server .getEventManager ().fire (new ListenerCloseEvent (oldBind , type )).join ();
190219
191- Preconditions .checkState (serverChannel != null , "Endpoint %s not registered" , oldBind );
192- LOGGER .info ("Closing endpoint {}" , serverChannel .localAddress ());
193- serverChannel .close ().syncUninterruptibly ();
220+ for (Endpoint endpoint : endpoints ) {
221+ Channel serverChannel = endpoint .channel ();
222+ LOGGER .info ("Closing endpoint {}" , serverChannel .localAddress ());
223+ serverChannel .close ().syncUninterruptibly ();
224+ }
194225 }
195226
196227 /**
@@ -199,24 +230,28 @@ public void close(final InetSocketAddress oldBind) {
199230 * @param interrupt should closing forward interruptions
200231 */
201232 public void closeEndpoints (final boolean interrupt ) {
202- for (final Map .Entry <InetSocketAddress , Endpoint > entry : this .endpoints .entrySet ()) {
233+ for (final Map .Entry <InetSocketAddress , Collection <Endpoint >> entry : this .endpoints .asMap ()
234+ .entrySet ()) {
203235 final InetSocketAddress address = entry .getKey ();
204- final Endpoint endpoint = entry .getValue ();
236+ final Collection <Endpoint > endpoints = entry .getValue ();
237+ ListenerType type = endpoints .iterator ().next ().type ();
205238
206239 // Fire proxy close event to notify plugins of socket close. We block since plugins
207240 // should have a chance to be notified before the server stops accepting connections.
208- server .getEventManager ().fire (new ListenerCloseEvent (address , endpoint .type ())).join ();
209-
210- LOGGER .info ("Closing endpoint {}" , address );
211- if (interrupt ) {
212- try {
213- endpoint .channel ().close ().sync ();
214- } catch (final InterruptedException e ) {
215- LOGGER .info ("Interrupted whilst closing endpoint" , e );
216- Thread .currentThread ().interrupt ();
241+ server .getEventManager ().fire (new ListenerCloseEvent (address , type )).join ();
242+
243+ for (Endpoint endpoint : endpoints ) {
244+ LOGGER .info ("Closing endpoint {}" , address );
245+ if (interrupt ) {
246+ try {
247+ endpoint .channel ().close ().sync ();
248+ } catch (final InterruptedException e ) {
249+ LOGGER .info ("Interrupted whilst closing endpoint" , e );
250+ Thread .currentThread ().interrupt ();
251+ }
252+ } else {
253+ endpoint .channel ().close ().syncUninterruptibly ();
217254 }
218- } else {
219- endpoint .channel ().close ().syncUninterruptibly ();
220255 }
221256 }
222257 this .endpoints .clear ();
@@ -246,8 +281,8 @@ public ServerChannelInitializerHolder getServerChannelInitializer() {
246281 */
247282 public HttpClient createHttpClient () {
248283 return HttpClient .newBuilder ()
249- .executor (this .workerGroup )
250- .build ();
284+ .executor (this .workerGroup )
285+ .build ();
251286 }
252287
253288 public BackendChannelInitializerHolder getBackendChannelInitializer () {
0 commit comments