diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/backend/MasterServerMessageHandler.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/backend/MasterServerMessageHandler.java index e139813..821bb8d 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/backend/MasterServerMessageHandler.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/backend/MasterServerMessageHandler.java @@ -17,28 +17,53 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; public class MasterServerMessageHandler extends NettyServerChannelHandlerLayer { private final Map> pendingCommandDispatches = Maps.newConcurrentMap(); private final AtomicInteger traceIdGenerator = new AtomicInteger(0); + private volatile UUID workerUUID; private volatile String workerName; + private volatile boolean commandDispatcherRetired = false; + private final StampedLock commandDispatchCallbackLock = new StampedLock(); + public void dispatchCommandToWorker(String command, Consumer onDispatched) { - final int traceId = this.traceIdGenerator.getAndIncrement(); - final Consumer wrappedDecoder = json -> { - try { - final Component decoded = LegacyComponentSerializer.builder().build().deserialize(json); - onDispatched.accept(decoded); - } catch (Exception e) { - EntryPoint.LOGGER_INST.error("Failed to decode command result from worker", e); + final long stamp = this.commandDispatchCallbackLock.readLock(); + try { + // We were retired during connection + if (this.commandDispatcherRetired) { + onDispatched.accept(null); + return; } - }; - this.pendingCommandDispatches.put(traceId, wrappedDecoder); - this.sendMessage(new M2WDispatchCommandMessage(traceId, command)); + final int traceId = this.traceIdGenerator.getAndIncrement(); + + final Consumer wrappedDecoder = json -> { + try { + // We were retired during disconnection + if (json == null) { + onDispatched.accept(null); + return; + } + + final Component decoded = LegacyComponentSerializer.builder().build().deserialize(json); + onDispatched.accept(decoded); + } catch (Exception e) { + EntryPoint.LOGGER_INST.error("Failed to decode command result from worker", e); + onDispatched.accept(null); + } + }; + + this.pendingCommandDispatches.put(traceId, wrappedDecoder); + this.sendMessage(new M2WDispatchCommandMessage(traceId, command)); + }finally { + this.commandDispatchCallbackLock.unlockRead(stamp); + } } @Nullable @@ -54,6 +79,8 @@ public String getWorkerName() { @Override public void channelInactive(@NotNull ChannelHandlerContext ctx) { + this.retireAllCommandDispatchCallbacks(); + if (this.workerUUID == null) { return; } @@ -61,6 +88,20 @@ public void channelInactive(@NotNull ChannelHandlerContext ctx) { Freesia.registedWorkers.remove(this.workerUUID); } + private void retireAllCommandDispatchCallbacks() { + final long stamp = this.commandDispatchCallbackLock.writeLock(); + try { + this.commandDispatcherRetired = true; + for (Map.Entry> entry : this.pendingCommandDispatches.entrySet()) { + entry.getValue().accept(null); + } + + this.pendingCommandDispatches.clear(); + }finally { + this.commandDispatchCallbackLock.unlockWrite(stamp); + } + } + @Override public CompletableFuture readPlayerData(UUID playerUUID) { final CompletableFuture callback = new CompletableFuture<>(); diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/mc/FreesiaPlayerTracker.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/mc/FreesiaPlayerTracker.java index b18cbed..6bdf2e8 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/mc/FreesiaPlayerTracker.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/mc/FreesiaPlayerTracker.java @@ -114,19 +114,27 @@ public CompletableFuture> getCanSee(@NotNull UUID target) { final Optional targetPlayerNullable = Freesia.PROXY_SERVER.getPlayer(target); + final boolean[] cancelCallbackAdd = {false}; if (targetPlayerNullable.isPresent()) { final Player targetPlayer = targetPlayerNullable.get(); targetPlayer.getCurrentServer().ifPresentOrElse( server -> server.getServer().sendPluginMessage(SYNC_CHANNEL_KEY, callbackRequest.getBytes()), () -> { + cancelCallbackAdd[0] = true; callback.complete(null); // Maybe at the early stage } // Throw exception when we didn't find that server ); } else { + cancelCallbackAdd[0] = true; callback.complete(null); } + // If we didn't find the server, we need to remove the callback + if (cancelCallbackAdd[0]) { + this.pendingCanSeeTasks.remove(callbackId); + } + return callback; } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java index 8bd9975..91f0ae1 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java @@ -9,6 +9,7 @@ import meow.kikir.freesia.velocity.Freesia; import meow.kikir.freesia.velocity.utils.PendingPacket; import net.kyori.adventure.key.Key; +import net.kyori.adventure.text.Component; import org.geysermc.mcprotocollib.network.Session; import org.geysermc.mcprotocollib.network.event.session.*; import org.geysermc.mcprotocollib.network.packet.Packet; @@ -17,6 +18,7 @@ import org.geysermc.mcprotocollib.protocol.packet.common.serverbound.ServerboundCustomPayloadPacket; import org.geysermc.mcprotocollib.protocol.packet.common.serverbound.ServerboundPongPacket; import org.geysermc.mcprotocollib.protocol.packet.ingame.clientbound.ClientboundLoginPacket; +import org.jetbrains.annotations.Nullable; import java.lang.invoke.VarHandle; import java.util.Optional; @@ -65,6 +67,17 @@ protected void retireTrackerCallbacks(){ } } + public boolean sendPacket(Packet packet) { + final Session sessionObject = (Session) SESSION_HANDLE.getVolatile(this); + + if (sessionObject == null) { + return false; + } + + sessionObject.send(packet); + return true; + } + protected YsmPacketProxy getPacketProxy() { return this.packetProxy; } @@ -185,16 +198,33 @@ public void disconnecting(DisconnectingEvent event) { @Override public void disconnected(DisconnectedEvent event) { - Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", event.getReason()); // Log disconnected + this.detachFromManager(true, event); + } - // Log exceptions - if (event.getCause() != null) { - Freesia.LOGGER.info("Mapper session has disconnected for throwable: {}", event.getCause().getLocalizedMessage()); // Log errors + // Sometimes the callback would not be called when we destroy an non-connected mapper, + // so we separated the disconnect logics into here and manual call this in that cases + protected void detachFromManager(boolean updateSession, @Nullable DisconnectedEvent disconnectedEvent) { + Component reason = null; + + // Log disconnects if we disconnected it non-manually + if (disconnectedEvent != null) { + reason = disconnectedEvent.getReason(); + + Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", reason); // Log disconnected + + final Throwable thr = disconnectedEvent.getCause(); + + if (thr != null) { + Freesia.LOGGER.error("Mapper session has disconnected for throwable", thr); // Log errors + } } // Remove callback - this.mapperPayloadManager.onWorkerSessionDisconnect(this, (boolean) KICK_MASTER_HANDLE.getVolatile(this), event.getReason()); // Fire events - SESSION_HANDLE.setVolatile(this, null); //Set session to null to finalize the mapper connection + this.mapperPayloadManager.onWorkerSessionDisconnect(this, (boolean) KICK_MASTER_HANDLE.getVolatile(this), reason); // Fire events + + if (updateSession) { + SESSION_HANDLE.setVolatile(this, null); //Set session to null to finalize the mapper connection + } } protected void setSession(Session session) { @@ -214,6 +244,11 @@ public void destroyAndAwaitDisconnected() { // Destroy the session if (sessionObject != null) { sessionObject.disconnect("DESTROYED"); + }else { + // Disconnecting a non initialized session + // Manual call remove callbacks + // Remember: HERE SHOULDN'T BE ANY RACE CONDITION + this.detachFromManager(false, null); } // Wait for fully disconnected diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java index af52499..7d72e56 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java @@ -244,10 +244,15 @@ public void onPlayerDisconnect(@NotNull Player player) { } } - protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, Component reason) { + protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, @Nullable Component reason) { // Kick the master it binds if (kickMaster) - mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n(FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION, List.of("reason"), List.of(reason))); + mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n( + FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION, + List.of("reason"), + List.of(reason == null ? Component.text("DISCONNECTED MANUAL") : reason) + )); + // Remove from list this.mapperSessions.remove(mapperSession.getBindPlayer()); } diff --git a/Freesia-Worker/src/main/java/meow/kikir/freesia/worker/impl/WorkerMessageHandlerImpl.java b/Freesia-Worker/src/main/java/meow/kikir/freesia/worker/impl/WorkerMessageHandlerImpl.java index 3d3dec0..f48ad42 100644 --- a/Freesia-Worker/src/main/java/meow/kikir/freesia/worker/impl/WorkerMessageHandlerImpl.java +++ b/Freesia-Worker/src/main/java/meow/kikir/freesia/worker/impl/WorkerMessageHandlerImpl.java @@ -28,12 +28,16 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; public class WorkerMessageHandlerImpl extends NettyClientChannelHandlerLayer { private final AtomicInteger traceIdGenerator = new AtomicInteger(0); private final Map> playerDataGetCallbacks = Maps.newConcurrentMap(); + private volatile boolean playerDataFetchCallbackRetired = false; + private final StampedLock playerDataFetchCallbackLock = new StampedLock(); + @Override public void channelActive(@NotNull ChannelHandlerContext ctx) { super.channelActive(ctx); @@ -45,32 +49,63 @@ public void channelActive(@NotNull ChannelHandlerContext ctx) { @Override public void channelInactive(@NotNull ChannelHandlerContext ctx) { + this.retirePlayerFetchCallbacks(); super.channelInactive(ctx); ServerLoader.SERVER_INST.execute(ServerLoader::connectToBackend); } - public void getPlayerData(UUID playerUUID, Consumer onGot) { - final int generatedTraceId = this.traceIdGenerator.getAndIncrement(); - final Consumer wrappedDecoder = content -> { - CompoundTag decoded = null; + private void retirePlayerFetchCallbacks() { + final long stamp = this.playerDataFetchCallbackLock.writeLock(); + try { + this.playerDataFetchCallbackRetired = true; + + for (Map.Entry> entry : this.playerDataGetCallbacks.entrySet()) { + try { + entry.getValue().accept(null); + } catch (Exception e) { + EntryPoint.LOGGER_INST.error("Failed to fire player data callback!", e); + } + } - if (content == null) { + this.playerDataGetCallbacks.clear(); + }finally { + this.playerDataFetchCallbackLock.unlockWrite(stamp); + } + } + + public void getPlayerData(UUID playerUUID, Consumer onGot) { + final long stamp = this.playerDataFetchCallbackLock.readLock(); + try { + if (this.playerDataFetchCallbackRetired) { onGot.accept(null); return; } - try { - decoded = (CompoundTag) NbtIo.readAnyTag(new DataInputStream(new ByteArrayInputStream(content)), NbtAccounter.unlimitedHeap()); - } catch (Exception e) { - EntryPoint.LOGGER_INST.error("Failed to decode nbt!", e); - } + final int generatedTraceId = this.traceIdGenerator.getAndIncrement(); - onGot.accept(decoded); - }; + final Consumer wrappedDecoder = content -> { + CompoundTag decoded = null; + + if (content == null) { + onGot.accept(null); + return; + } - this.playerDataGetCallbacks.put(generatedTraceId, wrappedDecoder); + try { + decoded = (CompoundTag) NbtIo.readAnyTag(new DataInputStream(new ByteArrayInputStream(content)), NbtAccounter.unlimitedHeap()); + } catch (Exception e) { + EntryPoint.LOGGER_INST.error("Failed to decode nbt!", e); + } + + onGot.accept(decoded); + }; + + this.playerDataGetCallbacks.put(generatedTraceId, wrappedDecoder); - ServerLoader.clientInstance.sendToMaster(new W2MPlayerDataGetRequestMessage(playerUUID, generatedTraceId)); + ServerLoader.clientInstance.sendToMaster(new W2MPlayerDataGetRequestMessage(playerUUID, generatedTraceId)); + }finally { + this.playerDataFetchCallbackLock.unlockRead(stamp); + } } @Override @@ -100,6 +135,7 @@ public CompletableFuture dispatchCommand(String command) { Runnable scheduledCommand = () -> { CommandDispatcher commandDispatcher = ServerLoader.SERVER_INST.getCommands().getDispatcher(); + final ParseResults parsed = commandDispatcher.parse(command, ServerLoader.SERVER_INST.createCommandSourceStack().withSource(new CommandSource() { @Override public void sendSystemMessage(Component component) {