Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,53 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;

public class MasterServerMessageHandler extends NettyServerChannelHandlerLayer {
private final Map<Integer, Consumer<String>> pendingCommandDispatches = Maps.newConcurrentMap();
private final AtomicInteger traceIdGenerator = new AtomicInteger(0);

private volatile UUID workerUUID;
private volatile String workerName;

private volatile boolean commandDispatcherRetired = false;
private final StampedLock commandDispatchCallbackLock = new StampedLock();

public void dispatchCommandToWorker(String command, Consumer<Component> onDispatched) {
final int traceId = this.traceIdGenerator.getAndIncrement();
final Consumer<String> wrappedDecoder = json -> {
try {
final Component decoded = LegacyComponentSerializer.builder().build().deserialize(json);
onDispatched.accept(decoded);
} catch (Exception e) {
EntryPoint.LOGGER_INST.error("Failed to decode command result from worker", e);
final long stamp = this.commandDispatchCallbackLock.readLock();
try {
// We were retired during connection
if (this.commandDispatcherRetired) {
onDispatched.accept(null);
return;
}
};

this.pendingCommandDispatches.put(traceId, wrappedDecoder);
this.sendMessage(new M2WDispatchCommandMessage(traceId, command));
final int traceId = this.traceIdGenerator.getAndIncrement();

final Consumer<String> wrappedDecoder = json -> {
try {
// We were retired during disconnection
if (json == null) {
onDispatched.accept(null);
return;
}

final Component decoded = LegacyComponentSerializer.builder().build().deserialize(json);
onDispatched.accept(decoded);
} catch (Exception e) {
EntryPoint.LOGGER_INST.error("Failed to decode command result from worker", e);
onDispatched.accept(null);
}
};

this.pendingCommandDispatches.put(traceId, wrappedDecoder);
this.sendMessage(new M2WDispatchCommandMessage(traceId, command));
}finally {
this.commandDispatchCallbackLock.unlockRead(stamp);
}
}

@Nullable
Expand All @@ -54,13 +79,29 @@ public String getWorkerName() {

@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
this.retireAllCommandDispatchCallbacks();

if (this.workerUUID == null) {
return;
}

Freesia.registedWorkers.remove(this.workerUUID);
}

private void retireAllCommandDispatchCallbacks() {
final long stamp = this.commandDispatchCallbackLock.writeLock();
try {
this.commandDispatcherRetired = true;
for (Map.Entry<Integer, Consumer<String>> entry : this.pendingCommandDispatches.entrySet()) {
entry.getValue().accept(null);
}

this.pendingCommandDispatches.clear();
}finally {
this.commandDispatchCallbackLock.unlockWrite(stamp);
}
}

@Override
public CompletableFuture<byte[]> readPlayerData(UUID playerUUID) {
final CompletableFuture<byte[]> callback = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,27 @@ public CompletableFuture<Set<UUID>> getCanSee(@NotNull UUID target) {

final Optional<Player> targetPlayerNullable = Freesia.PROXY_SERVER.getPlayer(target);

final boolean[] cancelCallbackAdd = {false};
if (targetPlayerNullable.isPresent()) {
final Player targetPlayer = targetPlayerNullable.get();

targetPlayer.getCurrentServer().ifPresentOrElse(
server -> server.getServer().sendPluginMessage(SYNC_CHANNEL_KEY, callbackRequest.getBytes()),
() -> {
cancelCallbackAdd[0] = true;
callback.complete(null); // Maybe at the early stage
} // Throw exception when we didn't find that server
);
} else {
cancelCallbackAdd[0] = true;
callback.complete(null);
}

// If we didn't find the server, we need to remove the callback
if (cancelCallbackAdd[0]) {
this.pendingCanSeeTasks.remove(callbackId);
}

return callback;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import meow.kikir.freesia.velocity.Freesia;
import meow.kikir.freesia.velocity.utils.PendingPacket;
import net.kyori.adventure.key.Key;
import net.kyori.adventure.text.Component;
import org.geysermc.mcprotocollib.network.Session;
import org.geysermc.mcprotocollib.network.event.session.*;
import org.geysermc.mcprotocollib.network.packet.Packet;
Expand All @@ -17,6 +18,7 @@
import org.geysermc.mcprotocollib.protocol.packet.common.serverbound.ServerboundCustomPayloadPacket;
import org.geysermc.mcprotocollib.protocol.packet.common.serverbound.ServerboundPongPacket;
import org.geysermc.mcprotocollib.protocol.packet.ingame.clientbound.ClientboundLoginPacket;
import org.jetbrains.annotations.Nullable;

import java.lang.invoke.VarHandle;
import java.util.Optional;
Expand Down Expand Up @@ -65,6 +67,17 @@ protected void retireTrackerCallbacks(){
}
}

public boolean sendPacket(Packet packet) {
final Session sessionObject = (Session) SESSION_HANDLE.getVolatile(this);

if (sessionObject == null) {
return false;
}

sessionObject.send(packet);
return true;
}

protected YsmPacketProxy getPacketProxy() {
return this.packetProxy;
}
Expand Down Expand Up @@ -185,16 +198,33 @@ public void disconnecting(DisconnectingEvent event) {

@Override
public void disconnected(DisconnectedEvent event) {
Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", event.getReason()); // Log disconnected
this.detachFromManager(true, event);
}

// Log exceptions
if (event.getCause() != null) {
Freesia.LOGGER.info("Mapper session has disconnected for throwable: {}", event.getCause().getLocalizedMessage()); // Log errors
// Sometimes the callback would not be called when we destroy an non-connected mapper,
// so we separated the disconnect logics into here and manual call this in that cases
protected void detachFromManager(boolean updateSession, @Nullable DisconnectedEvent disconnectedEvent) {
Component reason = null;

// Log disconnects if we disconnected it non-manually
if (disconnectedEvent != null) {
reason = disconnectedEvent.getReason();

Freesia.LOGGER.info("Mapper session has disconnected for reason(non-deserialized): {}", reason); // Log disconnected

final Throwable thr = disconnectedEvent.getCause();

if (thr != null) {
Freesia.LOGGER.error("Mapper session has disconnected for throwable", thr); // Log errors
}
}

// Remove callback
this.mapperPayloadManager.onWorkerSessionDisconnect(this, (boolean) KICK_MASTER_HANDLE.getVolatile(this), event.getReason()); // Fire events
SESSION_HANDLE.setVolatile(this, null); //Set session to null to finalize the mapper connection
this.mapperPayloadManager.onWorkerSessionDisconnect(this, (boolean) KICK_MASTER_HANDLE.getVolatile(this), reason); // Fire events

if (updateSession) {
SESSION_HANDLE.setVolatile(this, null); //Set session to null to finalize the mapper connection
}
}

protected void setSession(Session session) {
Expand All @@ -214,6 +244,11 @@ public void destroyAndAwaitDisconnected() {
// Destroy the session
if (sessionObject != null) {
sessionObject.disconnect("DESTROYED");
}else {
// Disconnecting a non initialized session
// Manual call remove callbacks
// Remember: HERE SHOULDN'T BE ANY RACE CONDITION
this.detachFromManager(false, null);
}

// Wait for fully disconnected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,15 @@ public void onPlayerDisconnect(@NotNull Player player) {
}
}

protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, Component reason) {
protected void onWorkerSessionDisconnect(@NotNull MapperSessionProcessor mapperSession, boolean kickMaster, @Nullable Component reason) {
// Kick the master it binds
if (kickMaster)
mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n(FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION, List.of("reason"), List.of(reason)));
mapperSession.getBindPlayer().disconnect(Freesia.languageManager.i18n(
FreesiaConstants.LanguageConstants.WORKER_TERMINATED_CONNECTION,
List.of("reason"),
List.of(reason == null ? Component.text("DISCONNECTED MANUAL") : reason)
));

// Remove from list
this.mapperSessions.remove(mapperSession.getBindPlayer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;

public class WorkerMessageHandlerImpl extends NettyClientChannelHandlerLayer {
private final AtomicInteger traceIdGenerator = new AtomicInteger(0);
private final Map<Integer, Consumer<byte[]>> playerDataGetCallbacks = Maps.newConcurrentMap();

private volatile boolean playerDataFetchCallbackRetired = false;
private final StampedLock playerDataFetchCallbackLock = new StampedLock();

@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) {
super.channelActive(ctx);
Expand All @@ -45,32 +49,63 @@ public void channelActive(@NotNull ChannelHandlerContext ctx) {

@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
this.retirePlayerFetchCallbacks();
super.channelInactive(ctx);
ServerLoader.SERVER_INST.execute(ServerLoader::connectToBackend);
}

public void getPlayerData(UUID playerUUID, Consumer<CompoundTag> onGot) {
final int generatedTraceId = this.traceIdGenerator.getAndIncrement();
final Consumer<byte[]> wrappedDecoder = content -> {
CompoundTag decoded = null;
private void retirePlayerFetchCallbacks() {
final long stamp = this.playerDataFetchCallbackLock.writeLock();
try {
this.playerDataFetchCallbackRetired = true;

for (Map.Entry<Integer, Consumer<byte[]>> entry : this.playerDataGetCallbacks.entrySet()) {
try {
entry.getValue().accept(null);
} catch (Exception e) {
EntryPoint.LOGGER_INST.error("Failed to fire player data callback!", e);
}
}

if (content == null) {
this.playerDataGetCallbacks.clear();
}finally {
this.playerDataFetchCallbackLock.unlockWrite(stamp);
}
}

public void getPlayerData(UUID playerUUID, Consumer<CompoundTag> onGot) {
final long stamp = this.playerDataFetchCallbackLock.readLock();
try {
if (this.playerDataFetchCallbackRetired) {
onGot.accept(null);
return;
}

try {
decoded = (CompoundTag) NbtIo.readAnyTag(new DataInputStream(new ByteArrayInputStream(content)), NbtAccounter.unlimitedHeap());
} catch (Exception e) {
EntryPoint.LOGGER_INST.error("Failed to decode nbt!", e);
}
final int generatedTraceId = this.traceIdGenerator.getAndIncrement();

onGot.accept(decoded);
};
final Consumer<byte[]> wrappedDecoder = content -> {
CompoundTag decoded = null;

if (content == null) {
onGot.accept(null);
return;
}

this.playerDataGetCallbacks.put(generatedTraceId, wrappedDecoder);
try {
decoded = (CompoundTag) NbtIo.readAnyTag(new DataInputStream(new ByteArrayInputStream(content)), NbtAccounter.unlimitedHeap());
} catch (Exception e) {
EntryPoint.LOGGER_INST.error("Failed to decode nbt!", e);
}

onGot.accept(decoded);
};

this.playerDataGetCallbacks.put(generatedTraceId, wrappedDecoder);

ServerLoader.clientInstance.sendToMaster(new W2MPlayerDataGetRequestMessage(playerUUID, generatedTraceId));
ServerLoader.clientInstance.sendToMaster(new W2MPlayerDataGetRequestMessage(playerUUID, generatedTraceId));
}finally {
this.playerDataFetchCallbackLock.unlockRead(stamp);
}
}

@Override
Expand Down Expand Up @@ -100,6 +135,7 @@ public CompletableFuture<String> dispatchCommand(String command) {

Runnable scheduledCommand = () -> {
CommandDispatcher<CommandSourceStack> commandDispatcher = ServerLoader.SERVER_INST.getCommands().getDispatcher();

final ParseResults<CommandSourceStack> parsed = commandDispatcher.parse(command, ServerLoader.SERVER_INST.createCommandSourceStack().withSource(new CommandSource() {
@Override
public void sendSystemMessage(Component component) {
Expand Down