diff --git a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/FreesiaBackend.java b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/FreesiaBackend.java index 7e85b96..68d31aa 100644 --- a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/FreesiaBackend.java +++ b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/FreesiaBackend.java @@ -14,11 +14,16 @@ public final class FreesiaBackend extends JavaPlugin { @Override public void onEnable() { INSTANCE = this; + + // TODO- De-hard-coding? Bukkit.getMessenger().registerIncomingPluginChannel(this, "freesia:tracker_sync", this.trackerProcessor); Bukkit.getMessenger().registerOutgoingPluginChannel(this, "freesia:tracker_sync"); + + // TODO- De-hard-coding? Bukkit.getMessenger().registerIncomingPluginChannel(this, "freesia:virtual_player_management", this.virtualPlayerManager); Bukkit.getMessenger().registerOutgoingPluginChannel(this, "freesia:virtual_player_management"); + Bukkit.getPluginManager().registerEvents(this.trackerProcessor, this); } diff --git a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java index d0a7d34..22b4b90 100644 --- a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java +++ b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java @@ -41,12 +41,13 @@ public void onPlayerAddedToWorld(@NotNull EntityAddToWorldEvent event) { } } - private void playerTrackedPlayer(@NotNull Player beSeen, @NotNull Player seeing) { + // Fire tracker update events if (!new CyanidinRealPlayerTrackerUpdateEvent(seeing, beSeen).callEvent()) { return; } + // The true tracker update caller this.notifyTrackerUpdate(seeing.getUniqueId(), beSeen.getUniqueId()); } @@ -57,6 +58,7 @@ public void notifyTrackerUpdate(UUID watcher, UUID beWatched) { wrappedUpdatePacket.writeUUID(beWatched); wrappedUpdatePacket.writeUUID(watcher); + // Find a payload final Player payload = Utils.randomPlayerIfNotFound(watcher); if (payload == null) { @@ -90,6 +92,7 @@ public void onPluginMessageReceived(@NotNull String channel, @NotNull Player sen final CyanidinTrackerScanEvent trackerScanEvent = new CyanidinTrackerScanEvent(result, toScan); + // We need to schedule back to pass the dumb async catchers as it was firing from both netty threads and main threads sender.getScheduler().execute( FreesiaBackend.INSTANCE, () -> { diff --git a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/utils/FriendlyByteBuf.java b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/utils/FriendlyByteBuf.java index 0e744f9..93358b1 100644 --- a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/utils/FriendlyByteBuf.java +++ b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/utils/FriendlyByteBuf.java @@ -19,6 +19,10 @@ import java.nio.charset.StandardCharsets; import java.util.UUID; +/** + * A simplified FriendlyByteBuf reimplementation + * Taken from minecraft-stress-test(...) + */ public class FriendlyByteBuf extends ByteBuf { private final ByteBuf source; diff --git a/Freesia-Common/src/main/java/meow/kikir/freesia/common/utils/TimeExpiringCallbacker.java b/Freesia-Common/src/main/java/meow/kikir/freesia/common/utils/TimeExpiringCallbacker.java new file mode 100644 index 0000000..91d08ab --- /dev/null +++ b/Freesia-Common/src/main/java/meow/kikir/freesia/common/utils/TimeExpiringCallbacker.java @@ -0,0 +1,81 @@ +package meow.kikir.freesia.common.utils; + +import org.jetbrains.annotations.NotNull; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public abstract class TimeExpiringCallbacker implements Delayed { + private static final DelayQueue QUEUE = new DelayQueue<>(); + public static final Object NIL = new Object(); + + private static final VarHandle RESULT_HANDLE; + + static { + try { + RESULT_HANDLE = MethodHandles.lookup().findVarHandle( + TimeExpiringCallbacker.class, + "result", + Object.class + ); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + + Thread checkerThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + TimeExpiringCallbacker callbacker = QUEUE.take(); + callbacker.handleTimeout(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + e.printStackTrace(); + } + } + }, "Freesia-TimeExpiringCallbacker-Checker"); + + checkerThread.setDaemon(true); + checkerThread.setPriority(Thread.NORM_PRIORITY - 2); + checkerThread.start(); + } + + private final long expirationDeadline; + private Object result = null; + + public TimeExpiringCallbacker(long timeoutMs) { + this.expirationDeadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs); + QUEUE.add(this); + } + + public abstract void onFinished(Object result); + + private void handleTimeout() { + if (RESULT_HANDLE.compareAndSet(this, null, NIL)) { + this.onFinished(NIL); + } + } + + public void done(Object result) { + if (RESULT_HANDLE.compareAndSet(this, null, result)) { + + QUEUE.remove(this); + + this.onFinished(result); + } + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(expirationDeadline - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(@NotNull Delayed input) { + return Long.compare(this.expirationDeadline, ((TimeExpiringCallbacker) input).expirationDeadline); + } +} \ No newline at end of file diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java index 6a9703e..9f5c6b2 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java @@ -139,16 +139,21 @@ public EventTask onPlayerConnected(@NotNull ServerConnectedEvent event) { final Player targetPlayer = event.getPlayer(); return EventTask.async(() -> { + // On first connect if (!mapperManager.hasPlayer(targetPlayer)) { this.logger.info("Initiating mapper session for player {}", targetPlayer.getUsername()); + // Create mapper session mapperManager.firstCreateMapper(targetPlayer); - kickChecker.onPlayerJoin(targetPlayer); + // Add to client kicker + kickChecker.onPlayerJoin(targetPlayer); return; } - logger.info("Player {} has changed backend server.Reconnecting mapper session", targetPlayer.getUsername()); + // Player might switch its current server + logger.info("Player {} has changed backend server. Reconnecting mapper session", targetPlayer.getUsername()); + // So, reconnect mapper session mapperManager.reconnectWorker(targetPlayer); }); } @@ -166,6 +171,7 @@ public void onChannelMsg(@NotNull PluginMessageEvent event) { if ((identifier instanceof MinecraftChannelIdentifier mineId) && (event.getSource() instanceof Player player)) { event.setResult(PluginMessageEvent.ForwardResult.handled()); + // TODO Need a packet rate limiter here? mapperManager.onPluginMessageIn(player, mineId, data); } } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/events/PlayerEntityStateChangeEvent.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/events/PlayerEntityStateChangeEvent.java index 6e728ef..0531f87 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/events/PlayerEntityStateChangeEvent.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/events/PlayerEntityStateChangeEvent.java @@ -1,6 +1,7 @@ package meow.kikir.freesia.velocity.events; import com.github.retrooper.packetevents.protocol.nbt.NBTCompound; +import com.google.common.annotations.Beta; import com.velocitypowered.api.proxy.Player; /** @@ -8,6 +9,7 @@ * 获取到的Nbt是要发送给玩家的 * 注意:修改过后的nbt并不会被持久化即只会在当前进程发生作用而在重启后失效 */ +@Beta public class PlayerEntityStateChangeEvent { private final Player actualPlayer; private final int entityId; diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java index 0a478cd..771c325 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java @@ -20,15 +20,14 @@ import java.util.Optional; import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; public class DefaultYsmPacketProxyImpl implements YsmPacketProxy{ private final Player player; private final NbtRemapper nbtRemapper = new StandardNbtRemapperImpl(); private volatile NBTCompound lastYsmEntityStatus = null; - private final Lock entityStatusWriteLock = new ReentrantLock(true); // We need to keep its order + private final StampedLock entityStatusWriteLock = new StampedLock(); // Use optimistic locks private volatile int playerEntityId = -1; private volatile int workerPlayerEntityId = -1; @@ -117,17 +116,31 @@ private void sendEntityStateToInternal(Player target, NBTCompound entityStatus) @Override public void setEntityDataRaw(NBTCompound data) { - this.entityStatusWriteLock.lock(); + final long stamp = this.entityStatusWriteLock.writeLock(); try { this.lastYsmEntityStatus = data; }finally { - this.entityStatusWriteLock.unlock(); + this.entityStatusWriteLock.unlockWrite(stamp); } } @Override public void refreshToOthers() { - final NBTCompound entityStatusCopy = this.lastYsmEntityStatus; // Copy value + NBTCompound entityStatusCopy; // Copy value + + // Try optimistic read first + long stamp = this.entityStatusWriteLock.tryOptimisticRead(); + if (this.entityStatusWriteLock.validate(stamp)) { + entityStatusCopy = this.lastYsmEntityStatus; + }else { + // Fallback to read lock + try { + stamp = this.entityStatusWriteLock.readLock(); + entityStatusCopy = this.lastYsmEntityStatus; + }finally { + this.entityStatusWriteLock.unlockRead(stamp); + } + } // If the player does not have any data if (entityStatusCopy == null) { @@ -179,18 +192,17 @@ public ProxyComputeResult processS2C(Key key, ByteBuf copiedPacketData) { return ProxyComputeResult.ofDrop(); // Do not process the entity state if it is not ours } - // We process this actions async - // TODO : Is here any race condition ? - Freesia.PROXY_SERVER.getEventManager().fire(new PlayerEntityStateChangeEvent(this.player,workerEntityId, this.nbtRemapper.readBound(mcBuffer))).thenAccept(result -> { - this.entityStatusWriteLock.lock(); + Freesia.PROXY_SERVER.getEventManager().fire(new PlayerEntityStateChangeEvent(this.player,workerEntityId, this.nbtRemapper.readBound(mcBuffer))).thenAccept(result -> { // Use NbtRemapper for multi version clients + // Acquire write lock first + final long stamp = this.entityStatusWriteLock.writeLock(); try { - this.lastYsmEntityStatus = result.getEntityState(); // Read using the protocol version matched for the worker + this.lastYsmEntityStatus = result.getEntityState(); // Update value to the result }finally { - this.entityStatusWriteLock.unlock(); + this.entityStatusWriteLock.unlockWrite(stamp); } this.refreshToOthers(); - }); + }).join(); // Force blocking as we do not wanna break the sequence of the data } catch (Exception e) { Freesia.LOGGER.error("Error while in processing tracker!", e); return ProxyComputeResult.ofDrop(); diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java index 11b4c9d..df3d940 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java @@ -154,14 +154,19 @@ public void disconnecting(DisconnectingEvent event) { @Override public void disconnected(DisconnectedEvent event) { Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", event.getReason()); // Log disconnected + + // Log exceptions if (event.getCause() != null) { Freesia.LOGGER.info("Mapper session has disconnected for throwable: {}", event.getCause().getLocalizedMessage()); // Log errors } + + // Remove callback this.mapperPayloadManager.onWorkerSessionDisconnect(this, this.kickMasterWhenDisconnect, event.getReason()); // Fire events this.session = null; //Set session to null to finalize the mapper connection } public void waitForDisconnected() { + // We will set the session to null after finishing all disconnect logics while (this.session != null) { Thread.onSpinWait(); // Spin wait instead of block waiting } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java index b51d470..9caba3d 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java @@ -19,7 +19,7 @@ public class VirtualYsmPacketProxyImpl implements YsmPacketProxy { private final UUID virtualPlayerUUID; private final NbtRemapper nbtRemapper = new StandardNbtRemapperImpl(); - private volatile NBTCompound lastYsmEntityStatus = null; + private volatile NBTCompound lastYsmEntityStatus = null; // TODO Need an access lock like DefaultYsmPacketProxyImpl? private volatile int playerEntityId = -1; public VirtualYsmPacketProxyImpl(UUID virtualPlayerUUID) { diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java index 2c04d56..8b4d916 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java @@ -265,12 +265,15 @@ public void onPlayerDisconnect(Player player) { } protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, Component reason) { + // Kick the master it binds if (kickMaster) mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n(FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION, List.of("reason"), List.of(reason))); + // Remove from list this.mapperSessions.remove(mapperSession.getBindPlayer()); } public void onPluginMessageIn(@NotNull Player player, @NotNull MinecraftChannelIdentifier channel, byte[] packetData) { + // Check if it is the message of ysm if (!channel.equals(YSM_CHANNEL_KEY_VELOCITY)) { return; } @@ -278,6 +281,7 @@ public void onPluginMessageIn(@NotNull Player player, @NotNull MinecraftChannelI final MapperSessionProcessor mapperSession = this.mapperSessions.get(player); if (mapperSession == null) { + // Actually it shouldn't be and never be happened throw new IllegalStateException("Mapper session not found or ready for player " + player.getUsername()); } @@ -296,6 +300,7 @@ public void onBackendReady(Player player) { } public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddress backend) { + // Instance new session final TcpClientSession mapperSession = new TcpClientSession( backend.getHostName(), backend.getPort(), @@ -307,10 +312,12 @@ public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddre ) ); + // Our packet processor for packet forwarding final MapperSessionProcessor packetProcessor = new MapperSessionProcessor(player, this.packetProxyCreator.apply(player), this); mapperSession.addListener(packetProcessor); + // Default as Minecraft client mapperSession.setFlag(BuiltinFlags.READ_TIMEOUT,30_000); mapperSession.setFlag(BuiltinFlags.WRITE_TIMEOUT,30_000); @@ -320,6 +327,7 @@ public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddre mapperSession.connect(true,false); } + @Deprecated public void onProxyLoggedin(Player player, MapperSessionProcessor packetProcessor, TcpClientSession session){ // TODO : Are we still using this callback ? } @@ -344,10 +352,13 @@ public void onRealPlayerTrackerUpdate(Player beingWatched, Player watcher) { // so as the result, we could simply pass it down directly if (mapperSession == null) { // Should not be happened - throw new IllegalStateException("???"); + // We use random player as the payload of custom payload of freesia tracker, so there is a possibility + // that race condition would happen between the disconnect logic and tracker update logic + //throw new IllegalStateException("???"); + return; } - if (this.isPlayerInstalledYsm(watcher)) { + if (this.isPlayerInstalledYsm(watcher)) { // Skip players who don't install ysm mapperSession.getPacketProxy().sendEntityStateTo(watcher); } } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java index 4fe385b..00803f2 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java @@ -2,6 +2,12 @@ import net.kyori.adventure.key.Key; +/** + * Pending packet object for callback processing + * @see meow.kikir.freesia.velocity.network.ysm.MapperSessionProcessor + * @param channel Channel name of the packet + * @param data Data of the packet + */ public record PendingPacket( Key channel, byte[] data