Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ public final class FreesiaBackend extends JavaPlugin {
@Override
public void onEnable() {
INSTANCE = this;

// TODO- De-hard-coding?
Bukkit.getMessenger().registerIncomingPluginChannel(this, "freesia:tracker_sync", this.trackerProcessor);
Bukkit.getMessenger().registerOutgoingPluginChannel(this, "freesia:tracker_sync");

// TODO- De-hard-coding?
Bukkit.getMessenger().registerIncomingPluginChannel(this, "freesia:virtual_player_management", this.virtualPlayerManager);
Bukkit.getMessenger().registerOutgoingPluginChannel(this, "freesia:virtual_player_management");


Bukkit.getPluginManager().registerEvents(this.trackerProcessor, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ public void onPlayerAddedToWorld(@NotNull EntityAddToWorldEvent event) {
}
}


private void playerTrackedPlayer(@NotNull Player beSeen, @NotNull Player seeing) {
// Fire tracker update events
if (!new CyanidinRealPlayerTrackerUpdateEvent(seeing, beSeen).callEvent()) {
return;
}

// The true tracker update caller
this.notifyTrackerUpdate(seeing.getUniqueId(), beSeen.getUniqueId());
}

Expand All @@ -57,6 +58,7 @@ public void notifyTrackerUpdate(UUID watcher, UUID beWatched) {
wrappedUpdatePacket.writeUUID(beWatched);
wrappedUpdatePacket.writeUUID(watcher);

// Find a payload
final Player payload = Utils.randomPlayerIfNotFound(watcher);

if (payload == null) {
Expand Down Expand Up @@ -90,6 +92,7 @@ public void onPluginMessageReceived(@NotNull String channel, @NotNull Player sen

final CyanidinTrackerScanEvent trackerScanEvent = new CyanidinTrackerScanEvent(result, toScan);

// We need to schedule back to pass the dumb async catchers as it was firing from both netty threads and main threads
sender.getScheduler().execute(
FreesiaBackend.INSTANCE,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
* A simplified FriendlyByteBuf reimplementation
* Taken from minecraft-stress-test(<a href="https://github.com/PureGero/minecraft-stress-test">...</a>)
*/
public class FriendlyByteBuf extends ByteBuf {
private final ByteBuf source;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package meow.kikir.freesia.common.utils;

import org.jetbrains.annotations.NotNull;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public abstract class TimeExpiringCallbacker implements Delayed {
private static final DelayQueue<TimeExpiringCallbacker> QUEUE = new DelayQueue<>();
public static final Object NIL = new Object();

private static final VarHandle RESULT_HANDLE;

static {
try {
RESULT_HANDLE = MethodHandles.lookup().findVarHandle(
TimeExpiringCallbacker.class,
"result",
Object.class
);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}

Thread checkerThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeExpiringCallbacker callbacker = QUEUE.take();
callbacker.handleTimeout();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Freesia-TimeExpiringCallbacker-Checker");

checkerThread.setDaemon(true);
checkerThread.setPriority(Thread.NORM_PRIORITY - 2);
checkerThread.start();
}

private final long expirationDeadline;
private Object result = null;

public TimeExpiringCallbacker(long timeoutMs) {
this.expirationDeadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
QUEUE.add(this);
}

public abstract void onFinished(Object result);

private void handleTimeout() {
if (RESULT_HANDLE.compareAndSet(this, null, NIL)) {
this.onFinished(NIL);
}
}

public void done(Object result) {
if (RESULT_HANDLE.compareAndSet(this, null, result)) {

QUEUE.remove(this);

this.onFinished(result);
}
}

@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(expirationDeadline - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(@NotNull Delayed input) {
return Long.compare(this.expirationDeadline, ((TimeExpiringCallbacker) input).expirationDeadline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,21 @@ public EventTask onPlayerConnected(@NotNull ServerConnectedEvent event) {
final Player targetPlayer = event.getPlayer();

return EventTask.async(() -> {
// On first connect
if (!mapperManager.hasPlayer(targetPlayer)) {
this.logger.info("Initiating mapper session for player {}", targetPlayer.getUsername());

// Create mapper session
mapperManager.firstCreateMapper(targetPlayer);
kickChecker.onPlayerJoin(targetPlayer);

// Add to client kicker
kickChecker.onPlayerJoin(targetPlayer);
return;
}

logger.info("Player {} has changed backend server.Reconnecting mapper session", targetPlayer.getUsername());
// Player might switch its current server
logger.info("Player {} has changed backend server. Reconnecting mapper session", targetPlayer.getUsername());
// So, reconnect mapper session
mapperManager.reconnectWorker(targetPlayer);
});
}
Expand All @@ -166,6 +171,7 @@ public void onChannelMsg(@NotNull PluginMessageEvent event) {
if ((identifier instanceof MinecraftChannelIdentifier mineId) && (event.getSource() instanceof Player player)) {
event.setResult(PluginMessageEvent.ForwardResult.handled());

// TODO Need a packet rate limiter here?
mapperManager.onPluginMessageIn(player, mineId, data);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package meow.kikir.freesia.velocity.events;

import com.github.retrooper.packetevents.protocol.nbt.NBTCompound;
import com.google.common.annotations.Beta;
import com.velocitypowered.api.proxy.Player;

/**
* 当玩家更改模型时或worker设置玩家时该事件会被触发
* 获取到的Nbt是要发送给玩家的
* 注意:修改过后的nbt并不会被持久化即只会在当前进程发生作用而在重启后失效
*/
@Beta
public class PlayerEntityStateChangeEvent {
private final Player actualPlayer;
private final int entityId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;

public class DefaultYsmPacketProxyImpl implements YsmPacketProxy{
private final Player player;
private final NbtRemapper nbtRemapper = new StandardNbtRemapperImpl();

private volatile NBTCompound lastYsmEntityStatus = null;
private final Lock entityStatusWriteLock = new ReentrantLock(true); // We need to keep its order
private final StampedLock entityStatusWriteLock = new StampedLock(); // Use optimistic locks

private volatile int playerEntityId = -1;
private volatile int workerPlayerEntityId = -1;
Expand Down Expand Up @@ -117,17 +116,31 @@ private void sendEntityStateToInternal(Player target, NBTCompound entityStatus)

@Override
public void setEntityDataRaw(NBTCompound data) {
this.entityStatusWriteLock.lock();
final long stamp = this.entityStatusWriteLock.writeLock();
try {
this.lastYsmEntityStatus = data;
}finally {
this.entityStatusWriteLock.unlock();
this.entityStatusWriteLock.unlockWrite(stamp);
}
}

@Override
public void refreshToOthers() {
final NBTCompound entityStatusCopy = this.lastYsmEntityStatus; // Copy value
NBTCompound entityStatusCopy; // Copy value

// Try optimistic read first
long stamp = this.entityStatusWriteLock.tryOptimisticRead();
if (this.entityStatusWriteLock.validate(stamp)) {
entityStatusCopy = this.lastYsmEntityStatus;
}else {
// Fallback to read lock
try {
stamp = this.entityStatusWriteLock.readLock();
entityStatusCopy = this.lastYsmEntityStatus;
}finally {
this.entityStatusWriteLock.unlockRead(stamp);
}
}

// If the player does not have any data
if (entityStatusCopy == null) {
Expand Down Expand Up @@ -179,18 +192,17 @@ public ProxyComputeResult processS2C(Key key, ByteBuf copiedPacketData) {
return ProxyComputeResult.ofDrop(); // Do not process the entity state if it is not ours
}

// We process this actions async
// TODO : Is here any race condition ?
Freesia.PROXY_SERVER.getEventManager().fire(new PlayerEntityStateChangeEvent(this.player,workerEntityId, this.nbtRemapper.readBound(mcBuffer))).thenAccept(result -> {
this.entityStatusWriteLock.lock();
Freesia.PROXY_SERVER.getEventManager().fire(new PlayerEntityStateChangeEvent(this.player,workerEntityId, this.nbtRemapper.readBound(mcBuffer))).thenAccept(result -> { // Use NbtRemapper for multi version clients
// Acquire write lock first
final long stamp = this.entityStatusWriteLock.writeLock();
try {
this.lastYsmEntityStatus = result.getEntityState(); // Read using the protocol version matched for the worker
this.lastYsmEntityStatus = result.getEntityState(); // Update value to the result
}finally {
this.entityStatusWriteLock.unlock();
this.entityStatusWriteLock.unlockWrite(stamp);
}

this.refreshToOthers();
});
}).join(); // Force blocking as we do not wanna break the sequence of the data
} catch (Exception e) {
Freesia.LOGGER.error("Error while in processing tracker!", e);
return ProxyComputeResult.ofDrop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,19 @@ public void disconnecting(DisconnectingEvent event) {
@Override
public void disconnected(DisconnectedEvent event) {
Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", event.getReason()); // Log disconnected

// Log exceptions
if (event.getCause() != null) {
Freesia.LOGGER.info("Mapper session has disconnected for throwable: {}", event.getCause().getLocalizedMessage()); // Log errors
}

// Remove callback
this.mapperPayloadManager.onWorkerSessionDisconnect(this, this.kickMasterWhenDisconnect, event.getReason()); // Fire events
this.session = null; //Set session to null to finalize the mapper connection
}

public void waitForDisconnected() {
// We will set the session to null after finishing all disconnect logics
while (this.session != null) {
Thread.onSpinWait(); // Spin wait instead of block waiting
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class VirtualYsmPacketProxyImpl implements YsmPacketProxy {
private final UUID virtualPlayerUUID;
private final NbtRemapper nbtRemapper = new StandardNbtRemapperImpl();
private volatile NBTCompound lastYsmEntityStatus = null;
private volatile NBTCompound lastYsmEntityStatus = null; // TODO Need an access lock like DefaultYsmPacketProxyImpl?
private volatile int playerEntityId = -1;

public VirtualYsmPacketProxyImpl(UUID virtualPlayerUUID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,23 @@ public void onPlayerDisconnect(Player player) {
}

protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, Component reason) {
// Kick the master it binds
if (kickMaster)
mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n(FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION, List.of("reason"), List.of(reason)));
// Remove from list
this.mapperSessions.remove(mapperSession.getBindPlayer());
}

public void onPluginMessageIn(@NotNull Player player, @NotNull MinecraftChannelIdentifier channel, byte[] packetData) {
// Check if it is the message of ysm
if (!channel.equals(YSM_CHANNEL_KEY_VELOCITY)) {
return;
}

final MapperSessionProcessor mapperSession = this.mapperSessions.get(player);

if (mapperSession == null) {
// Actually it shouldn't be and never be happened
throw new IllegalStateException("Mapper session not found or ready for player " + player.getUsername());
}

Expand All @@ -296,6 +300,7 @@ public void onBackendReady(Player player) {
}

public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddress backend) {
// Instance new session
final TcpClientSession mapperSession = new TcpClientSession(
backend.getHostName(),
backend.getPort(),
Expand All @@ -307,10 +312,12 @@ public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddre
)
);

// Our packet processor for packet forwarding
final MapperSessionProcessor packetProcessor = new MapperSessionProcessor(player, this.packetProxyCreator.apply(player), this);

mapperSession.addListener(packetProcessor);

// Default as Minecraft client
mapperSession.setFlag(BuiltinFlags.READ_TIMEOUT,30_000);
mapperSession.setFlag(BuiltinFlags.WRITE_TIMEOUT,30_000);

Expand All @@ -320,6 +327,7 @@ public void createMapperSession(@NotNull Player player, @NotNull InetSocketAddre
mapperSession.connect(true,false);
}

@Deprecated
public void onProxyLoggedin(Player player, MapperSessionProcessor packetProcessor, TcpClientSession session){
// TODO : Are we still using this callback ?
}
Expand All @@ -344,10 +352,13 @@ public void onRealPlayerTrackerUpdate(Player beingWatched, Player watcher) {
// so as the result, we could simply pass it down directly
if (mapperSession == null) {
// Should not be happened
throw new IllegalStateException("???");
// We use random player as the payload of custom payload of freesia tracker, so there is a possibility
// that race condition would happen between the disconnect logic and tracker update logic
//throw new IllegalStateException("???");
return;
}

if (this.isPlayerInstalledYsm(watcher)) {
if (this.isPlayerInstalledYsm(watcher)) { // Skip players who don't install ysm
mapperSession.getPacketProxy().sendEntityStateTo(watcher);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import net.kyori.adventure.key.Key;

/**
* Pending packet object for callback processing
* @see meow.kikir.freesia.velocity.network.ysm.MapperSessionProcessor
* @param channel Channel name of the packet
* @param data Data of the packet
*/
public record PendingPacket(
Key channel,
byte[] data
Expand Down