From 548a9a1452f39a33764b4ea9c4580ac387d0c11d Mon Sep 17 00:00:00 2001 From: MrHua269 Date: Fri, 21 Mar 2025 19:29:04 +0800 Subject: [PATCH 1/2] Ensure packet process order and removed debug statements --- .../backend/tracker/TrackerProcessor.java | 1 - Freesia-Velocity/build.gradle.kts | 1 + .../meow/kikir/freesia/velocity/Freesia.java | 10 ++++ .../network/ysm/MapperSessionProcessor.java | 51 ++++++++++++++----- .../network/ysm/YsmMapperPayloadManager.java | 11 ++++ .../freesia/velocity/utils/PendingPacket.java | 9 ++++ 6 files changed, 69 insertions(+), 14 deletions(-) create mode 100644 Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java diff --git a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java index ded981b..d0a7d34 100644 --- a/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java +++ b/Freesia-Backend/src/main/java/meow/kikir/freesia/backend/tracker/TrackerProcessor.java @@ -47,7 +47,6 @@ private void playerTrackedPlayer(@NotNull Player beSeen, @NotNull Player seeing) return; } - FreesiaBackend.INSTANCE.getSLF4JLogger().info("Player {} is tracking player {}", seeing.getName(), beSeen.getName()); this.notifyTrackerUpdate(seeing.getUniqueId(), beSeen.getUniqueId()); } diff --git a/Freesia-Velocity/build.gradle.kts b/Freesia-Velocity/build.gradle.kts index 17a2ff7..8914359 100644 --- a/Freesia-Velocity/build.gradle.kts +++ b/Freesia-Velocity/build.gradle.kts @@ -6,6 +6,7 @@ dependencies { implementation("com.electronwill.night-config:toml:3.6.6") implementation("org.geysermc.mcprotocollib:protocol:1.21-SNAPSHOT") implementation(project(":Freesia-Common")) + implementation("ca.spottedleaf:concurrentutil:0.0.3") annotationProcessor("com.velocitypowered:velocity-api:3.3.0-SNAPSHOT") } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java index aac12db..6a9703e 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/Freesia.java @@ -12,6 +12,7 @@ import com.velocitypowered.api.event.connection.DisconnectEvent; import com.velocitypowered.api.event.connection.PluginMessageEvent; import com.velocitypowered.api.event.player.ServerConnectedEvent; +import com.velocitypowered.api.event.player.ServerPostConnectEvent; import com.velocitypowered.api.event.player.ServerPreConnectEvent; import com.velocitypowered.api.event.proxy.ProxyInitializeEvent; import com.velocitypowered.api.plugin.Dependency; @@ -181,4 +182,13 @@ public void onPacketSend(@NotNull PacketSendEvent event) { mapperManager.updateRealPlayerEntityId(target, playerSpawnPacket.getEntityId()); } } + + // We need to push off the packet process of worker because the player's login packet might still not reached to the client when we create the mapper session + @Subscribe + public void onServerPostConnect(@NotNull ServerPostConnectEvent postConnectEvent) { + final Player target = postConnectEvent.getPlayer(); + + // Retire callbacks of worker ysm packet processing + mapperManager.onBackendReady(target); + } } diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java index 9a30e86..11b4c9d 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/MapperSessionProcessor.java @@ -1,10 +1,12 @@ package meow.kikir.freesia.velocity.network.ysm; +import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue; import com.velocitypowered.api.proxy.Player; import com.velocitypowered.api.proxy.messages.MinecraftChannelIdentifier; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import meow.kikir.freesia.velocity.Freesia; +import meow.kikir.freesia.velocity.utils.PendingPacket; import net.kyori.adventure.key.Key; import org.geysermc.mcprotocollib.network.Session; import org.geysermc.mcprotocollib.network.event.session.*; @@ -22,6 +24,7 @@ public class MapperSessionProcessor implements SessionListener { private final Player bindPlayer; private final YsmPacketProxy packetProxy; private final YsmMapperPayloadManager mapperPayloadManager; + private final MultiThreadedQueue pendingYsmPacketsInbound = new MultiThreadedQueue<>(); private volatile Session session; private volatile boolean kickMasterWhenDisconnect = true; @@ -66,10 +69,21 @@ public Player getBindPlayer() { return this.bindPlayer; } + public void onBackendReady() { + // Process incoming packets that we had not ready to process before + PendingPacket pendingYsmPacket; + while ((pendingYsmPacket = this.pendingYsmPacketsInbound.pollOrBlockAdds()) != null) { // Destroy(block add operations) the queue + this.processInComingYsmPacket(pendingYsmPacket.channel(), pendingYsmPacket.data()); + } + } + @Override public void packetReceived(Session session, Packet packet) { if (packet instanceof ClientboundLoginPacket loginPacket) { + // Notify entity update to notify the tracker update of the player Freesia.mapperManager.updateWorkerPlayerEntityId(this.bindPlayer, loginPacket.getEntityId()); + // Worker connection callbacks, but we are not using it currently + // Considering to remove it in the future Freesia.mapperManager.onProxyLoggedin(this.bindPlayer, this, ((TcpClientSession) session)); } @@ -77,21 +91,15 @@ public void packetReceived(Session session, Packet packet) { final Key channelKey = payloadPacket.getChannel(); final byte[] packetData = payloadPacket.getData(); + // If the packet is of ysm if (channelKey.toString().equals(YsmMapperPayloadManager.YSM_CHANNEL_KEY_ADVENTURE.toString())) { - final ProxyComputeResult result = this.packetProxy.processS2C(channelKey, Unpooled.wrappedBuffer(packetData)); - - switch (result.result()) { - case MODIFY -> { - final ByteBuf finalData = result.data(); - - finalData.resetReaderIndex(); - - this.packetProxy.sendPluginMessageToOwner(MinecraftChannelIdentifier.create(channelKey.namespace(), channelKey.value()), finalData); - } - - case PASS -> - this.packetProxy.sendPluginMessageToOwner(MinecraftChannelIdentifier.create(channelKey.namespace(), channelKey.value()), packetData); + // Check if we are not ready for the backend side yet(We will block the add operations once the backend is ready for the player) + final PendingPacket pendingPacket = new PendingPacket(channelKey, packetData); + if (!this.pendingYsmPacketsInbound.offer(pendingPacket)) { + // Add is blocked, we'll process it directly + this.processInComingYsmPacket(channelKey, packetData); } + // Otherwise, we push it into the callback queue } } @@ -101,6 +109,23 @@ public void packetReceived(Session session, Packet packet) { } } + private void processInComingYsmPacket(Key channelKey, byte[] packetData) { + final ProxyComputeResult result = this.packetProxy.processS2C(channelKey, Unpooled.wrappedBuffer(packetData)); + + switch (result.result()) { + case MODIFY -> { + final ByteBuf finalData = result.data(); + + finalData.resetReaderIndex(); + + this.packetProxy.sendPluginMessageToOwner(MinecraftChannelIdentifier.create(channelKey.namespace(), channelKey.value()), finalData); + } + + case PASS -> + this.packetProxy.sendPluginMessageToOwner(MinecraftChannelIdentifier.create(channelKey.namespace(), channelKey.value()), packetData); + } + } + @Override public void packetSending(PacketSendingEvent event) { diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java index 60e2d9f..2c04d56 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/YsmMapperPayloadManager.java @@ -284,6 +284,17 @@ public void onPluginMessageIn(@NotNull Player player, @NotNull MinecraftChannelI mapperSession.processPlayerPluginMessage(packetData); } + public void onBackendReady(Player player) { + final MapperSessionProcessor mapperSession = this.mapperSessions.get(player); + + if (mapperSession == null) { + // Shouldn't be happened + throw new IllegalStateException("???"); + } + + mapperSession.onBackendReady(); + } + public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddress backend) { final TcpClientSession mapperSession = new TcpClientSession( backend.getHostName(), diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java new file mode 100644 index 0000000..4fe385b --- /dev/null +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/utils/PendingPacket.java @@ -0,0 +1,9 @@ +package meow.kikir.freesia.velocity.utils; + +import net.kyori.adventure.key.Key; + +public record PendingPacket( + Key channel, + byte[] data +) { +} From f77ceb28a4bf801ed70b23d57ff7cec91b11b7bf Mon Sep 17 00:00:00 2001 From: MrHua269 Date: Fri, 21 Mar 2025 19:47:47 +0800 Subject: [PATCH 2/2] Try fixing race condition in real player ysm entity status updater --- .../ysm/DefaultYsmPacketProxyImpl.java | 38 +++++++++++++++---- .../ysm/VirtualYsmPacketProxyImpl.java | 19 +++++++--- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java index f25f28f..0a478cd 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/DefaultYsmPacketProxyImpl.java @@ -20,12 +20,15 @@ import java.util.Optional; import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class DefaultYsmPacketProxyImpl implements YsmPacketProxy{ private final Player player; private final NbtRemapper nbtRemapper = new StandardNbtRemapperImpl(); private volatile NBTCompound lastYsmEntityStatus = null; + private final Lock entityStatusWriteLock = new ReentrantLock(true); // We need to keep its order private volatile int playerEntityId = -1; private volatile int workerPlayerEntityId = -1; @@ -80,11 +83,13 @@ private boolean isEntityStateOfSelf(int entityId){ @Override public void sendEntityStateTo(@NotNull Player target){ - final int currentEntityId = this.playerEntityId; // Get current entity id on the server of the player + this.sendEntityStateToInternal(target, this.lastYsmEntityStatus); + } - final NBTCompound lastEntityStatusTemp = this.lastYsmEntityStatus; // Copy the value instead of the reference + private void sendEntityStateToInternal(Player target, NBTCompound entityStatus) { + final int currentEntityId = this.playerEntityId; // Get current entity id on the server of the player - if (lastEntityStatusTemp == null || currentEntityId == -1) { // If no data got or player is not in the backend server currently + if (entityStatus == null || currentEntityId == -1) { // If no data got or player is not in the backend server currently return; } @@ -102,7 +107,7 @@ public void sendEntityStateTo(@NotNull Player target){ wrappedPacketData.writeByte(4); wrappedPacketData.writeVarInt(currentEntityId); - wrappedPacketData.writeBytes(this.nbtRemapper.shouldRemap(targetProtocolVer) ? this.nbtRemapper.remapToMasterVer(lastEntityStatusTemp) : this.nbtRemapper.remapToWorkerVer(lastEntityStatusTemp)); // Remap nbt if needed + wrappedPacketData.writeBytes(this.nbtRemapper.shouldRemap(targetProtocolVer) ? this.nbtRemapper.remapToMasterVer(entityStatus) : this.nbtRemapper.remapToWorkerVer(entityStatus)); // Remap nbt if needed this.sendPluginMessageTo(target, YsmMapperPayloadManager.YSM_CHANNEL_KEY_VELOCITY, wrappedPacketData); } catch (Exception e) { @@ -112,12 +117,24 @@ public void sendEntityStateTo(@NotNull Player target){ @Override public void setEntityDataRaw(NBTCompound data) { - this.lastYsmEntityStatus = data; + this.entityStatusWriteLock.lock(); + try { + this.lastYsmEntityStatus = data; + }finally { + this.entityStatusWriteLock.unlock(); + } } @Override public void refreshToOthers() { - this.sendEntityStateTo(this.player); // Sync to self + final NBTCompound entityStatusCopy = this.lastYsmEntityStatus; // Copy value + + // If the player does not have any data + if (entityStatusCopy == null) { + return; + } + + this.sendEntityStateToInternal(this.player, entityStatusCopy); // Sync to self Freesia.tracker.getCanSee(this.player.getUniqueId()).whenComplete((beingWatched, exception) -> { // Async tracker check request to backend server // Exception handling @@ -137,7 +154,7 @@ public void refreshToOthers() { continue; } - this.sendEntityStateTo(target); // Sync to target + this.sendEntityStateToInternal(target, entityStatusCopy); // Sync to target } } } @@ -165,7 +182,12 @@ public ProxyComputeResult processS2C(Key key, ByteBuf copiedPacketData) { // We process this actions async // TODO : Is here any race condition ? Freesia.PROXY_SERVER.getEventManager().fire(new PlayerEntityStateChangeEvent(this.player,workerEntityId, this.nbtRemapper.readBound(mcBuffer))).thenAccept(result -> { - this.lastYsmEntityStatus = result.getEntityState(); // Read using the protocol version matched for the worker + this.entityStatusWriteLock.lock(); + try { + this.lastYsmEntityStatus = result.getEntityState(); // Read using the protocol version matched for the worker + }finally { + this.entityStatusWriteLock.unlock(); + } this.refreshToOthers(); }); diff --git a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java index 8159497..b51d470 100644 --- a/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java +++ b/Freesia-Velocity/src/main/java/meow/kikir/freesia/velocity/network/ysm/VirtualYsmPacketProxyImpl.java @@ -48,6 +48,13 @@ public void setEntityDataRaw(NBTCompound data) { @Override public void refreshToOthers() { + final NBTCompound entityStatus = this.lastYsmEntityStatus; // Copy the value + + // If the entity dose not have any data + if (entityStatus == null) { + return; + } + Freesia.tracker.getCanSee(this.virtualPlayerUUID).whenComplete((beingWatched, exception) -> { // Async tracker check request to backend server if (beingWatched != null) { // Actually there is impossible to be null for (UUID targetUUID : beingWatched) { @@ -60,7 +67,7 @@ public void refreshToOthers() { continue; } - this.sendEntityStateTo(target); // Sync to target + this.sendEntityStateToInternal(target, entityStatus); // Sync to target } } } @@ -101,11 +108,13 @@ public int getPlayerWorkerEntityId() { @Override public void sendEntityStateTo(@NotNull Player target) { - final int currentEntityId = this.playerEntityId; // Get current entity id on the server of the player + this.sendEntityStateToInternal(target, this.lastYsmEntityStatus); + } - final NBTCompound lastEntityStatusTemp = this.lastYsmEntityStatus; // Copy the value instead of the reference + private void sendEntityStateToInternal(Player target, NBTCompound entityStatus) { + final int currentEntityId = this.playerEntityId; // Get current entity id on the server of the player - if (lastEntityStatusTemp == null || currentEntityId == -1) { // If no data got or player is not in the backend server currently + if (entityStatus == null || currentEntityId == -1) { // If no data got or player is not in the backend server currently return; } @@ -123,7 +132,7 @@ public void sendEntityStateTo(@NotNull Player target) { wrappedPacketData.writeByte(4); wrappedPacketData.writeVarInt(currentEntityId); - wrappedPacketData.writeBytes(this.nbtRemapper.shouldRemap(targetProtocolVer) ? this.nbtRemapper.remapToMasterVer(lastEntityStatusTemp) : this.nbtRemapper.remapToWorkerVer(lastEntityStatusTemp)); // Remap nbt if needed + wrappedPacketData.writeBytes(this.nbtRemapper.shouldRemap(targetProtocolVer) ? this.nbtRemapper.remapToMasterVer(entityStatus) : this.nbtRemapper.remapToWorkerVer(entityStatus)); // Remap nbt if needed this.sendPluginMessageTo(target, YsmMapperPayloadManager.YSM_CHANNEL_KEY_VELOCITY, wrappedPacketData); } catch (Exception e) {