Skip to content
This repository was archived by the owner on Aug 15, 2022. It is now read-only.
Open

Alloy #396

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ public void SendReliableMessage(IMessage message)

public void SendReliableMessage(IMessage message, INetPlayer player)
{
MessageBus.SendMessage(message, SocketFacade.ManagedSocket, player.EndPoint);
MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, player.EndPoint);
}

public void SendReliableMessage(IMessage message, EndPoint endpoint)
{
MessageBus.SendMessage(message, SocketFacade.ManagedSocket, endpoint);
MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, endpoint);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Forge.Serialization;
using Forge.Serialization;

namespace Forge.Networking.Messaging
{
Expand All @@ -9,8 +9,17 @@ public abstract class ForgeMessage : IMessage
public abstract IMessageInterpreter Interpreter { get; }
public abstract void Serialize(BMSByte buffer);
public abstract void Deserialize(BMSByte buffer);
public bool IsPooled { get; set; } = false;
public bool IsBuffered { get; set; } = false;
public bool IsSent { get; set; } = false;
public void Sent()
{
IsSent = true;
OnMessageSent?.Invoke(this);
}
public void Unbuffered()
{
IsBuffered = false;
OnMessageSent?.Invoke(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public void SendMessage(IMessage message, ISocket sender, EndPoint receiver)
public IMessageReceiptSignature SendReliableMessage(IMessage message, ISocket sender, EndPoint receiver)
{
message.Receipt = AbstractFactory.Get<INetworkTypeFactory>().GetNew<IMessageReceiptSignature>();
SendMessage(message, sender, receiver);
_messageRepeater.AddMessageToRepeat(message, receiver);
SendMessage(message, sender, receiver);
return message.Receipt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ public static int GetCodeFromType(Type type)
return code;
}

/// <summary>
/// Instantiate a message of Type T
/// </summary>
/// <example>
/// var dieMsg = ForgeMessageCodes.Instantiate<NetDieClientMessage>();
/// </example>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static T Instantiate<T>()
{
int code = GetCodeFromType(typeof(T));
return (T)Instantiate(code);
}

public static void Clear()
{
_messageTypes.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void Clear()
lock (_messages)
{
_messages.Clear();
// TODO: Need to unbuffer the messages, otherwise they will never return to message pool
}
}

Expand Down Expand Up @@ -74,6 +75,7 @@ public void AddMessage(IMessage message, EndPoint sender)
kv = new Dictionary<IMessageReceiptSignature, IMessage>();
_messages.Add(sender, kv);
}
message.IsBuffered = true;
kv.Add(message.Receipt, message);
}
}
Expand Down Expand Up @@ -104,11 +106,24 @@ public void AddMessage(IMessage message, EndPoint sender, int ttlMilliseconds)

public void RemoveAllFor(EndPoint sender)
{
var copy = new List<IMessage>();

lock (_messages)
{
var removals = new List<IMessageReceiptSignature>();
if (_messages.TryGetValue(sender, out var kv))
{
foreach (var mkv in kv)
copy.Add(mkv.Value);
}
_messages.Remove(sender);
}

try
{
foreach (var m in copy) m.Unbuffered();
}
catch { }
}

public void RemoveMessage(EndPoint sender, IMessage message)
Expand All @@ -127,7 +142,14 @@ private void RemoveFromMessageLookup(EndPoint sender, IMessageReceiptSignature r
lock (_messages)
{
if (_messages.TryGetValue(sender, out var kv))
{
try
{
kv[receipt].Unbuffered();
}
catch { } // Catch just in case message already removed
kv.Remove(receipt);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Forge.Serialization;
using Forge.Serialization;

namespace Forge.Networking.Messaging
{
Expand All @@ -12,5 +12,9 @@ public interface IMessage
void Serialize(BMSByte buffer);
void Deserialize(BMSByte buffer);
void Sent();
void Unbuffered();
bool IsPooled { get; set; }
bool IsBuffered { get; set; }
bool IsSent { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace Forge.Networking.Messaging
{
public class MessagePoolMulti
{
private readonly Dictionary<Type, Queue<IMessage>> _messagePools = new Dictionary<Type, Queue<IMessage>>();
private readonly Dictionary<Type, ConcurrentQueue<IMessage>> _messagePools = new Dictionary<Type, ConcurrentQueue<IMessage>>();

public IMessage Get(Type t)
{
var pool = GetPool(t);
if (pool.Count == 0)
return CreateNewMessageForPool(t, pool);
else
return pool.Dequeue();
{
// Try to dequeue, but if locked default to create new
IMessage item;
if (pool.TryDequeue(out item))
{
item.IsPooled = false;
return item;
}
else return CreateNewMessageForPool(t, pool);
}
}

public T Get<T>() where T : IMessage, new()
Expand All @@ -22,27 +32,36 @@ public IMessage Get(Type t)
if (pool.Count == 0)
return CreateNewMessageForPool<T>(pool);
else
return (T)pool.Dequeue();
{
// Try to dequeue, but if locked default to create new
IMessage item;
if (pool.TryDequeue(out item))
{
item.IsPooled = false;
return (T)item;
}
else return CreateNewMessageForPool<T>(pool);
}
}

private Queue<IMessage> GetPool(Type type)
private ConcurrentQueue<IMessage> GetPool(Type type)
{
if (!_messagePools.TryGetValue(type, out var pool))
{
pool = new Queue<IMessage>();
pool = new ConcurrentQueue<IMessage>();
_messagePools.Add(type, pool);
}
return pool;
}

private T CreateNewMessageForPool<T>(Queue<IMessage> pool) where T : IMessage, new()
private T CreateNewMessageForPool<T>(ConcurrentQueue<IMessage> pool) where T : IMessage, new()
{
T m = new T();
m.OnMessageSent += Release;
return m;
}

private IMessage CreateNewMessageForPool(Type t, Queue<IMessage> pool)
private IMessage CreateNewMessageForPool(Type t, ConcurrentQueue<IMessage> pool)
{
IMessage m = (IMessage)Activator.CreateInstance(t);
m.OnMessageSent += Release;
Expand All @@ -51,7 +70,12 @@ private IMessage CreateNewMessageForPool(Type t, Queue<IMessage> pool)

private void Release(IMessage message)
{
Queue<IMessage> pool = GetPool(message.GetType());
if (message.IsPooled) return; // Message has already been returned to pool
if (!message.IsSent) return; // Message has not been sent, not ready to return to pool
if (message.IsBuffered) return; // Message is still buffered, not ready to return to pool
message.IsSent = false;
message.IsPooled = true;
ConcurrentQueue<IMessage> pool = GetPool(message.GetType());
pool.Enqueue(message);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,28 @@ public override void ShutDown()
base.ShutDown();
}

public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint)
public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint)
{
INetPlayer player;
ForgeNetworkIdentityMessage netIdentity;
ForgeNetworkIdentityMessage netIdentity = null;
lock (_challengedPlayers)
{
player = _challengedPlayers.GetPlayer(endpoint);
netContainer.PlayerRepository.AddPlayer(player);
netIdentity = new ForgeNetworkIdentityMessage
if (_challengedPlayers.Exists(endpoint)) // CHANGE: Trap mallicious client sending multiple creates
{
Identity = player.Id
};
_challengedPlayers.RemovePlayer(player);
player = _challengedPlayers.GetPlayer(endpoint);
if (!netContainer.PlayerRepository.Exists(endpoint)) // CHANGE: Check for duplicate
{
netContainer.PlayerRepository.AddPlayer(player);
netIdentity = new ForgeNetworkIdentityMessage
{
Identity = player.Id
};
}
_challengedPlayers.RemovePlayer(player);
}
}
netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint);
if (netIdentity != null)
netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint);
}

protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender)
Expand All @@ -83,13 +90,26 @@ protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender)

private void CreatePlayer(object state)
{
bool sendChallenge = false;
var sender = (EndPoint)state;
var newPlayer = AbstractFactory.Get<INetworkTypeFactory>().GetNew<INetPlayer>();
newPlayer.EndPoint = sender;
newPlayer.LastCommunication = DateTime.Now;
_challengedPlayers.AddPlayer(newPlayer);
var challengeMessage = AbstractFactory.Get<INetworkTypeFactory>().GetNew<IChallengeMessage>();
networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender);

lock (_challengedPlayers) // CHANGE: Need to lock because this runs on different thread
{
if (!_challengedPlayers.Exists(sender)) // CHANGE: ProcessMessageRead may run multiple times CreatePlayer and call this multiple times before the player gets created.
{
var newPlayer = AbstractFactory.Get<INetworkTypeFactory>().GetNew<INetPlayer>();
newPlayer.EndPoint = sender;
newPlayer.LastCommunication = DateTime.Now;
_challengedPlayers.AddPlayer(newPlayer);
sendChallenge = true;
}
}

if (sendChallenge)
{
var challengeMessage = AbstractFactory.Get<INetworkTypeFactory>().GetNew<IChallengeMessage>();
networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender);
}
}

protected void ProcessPlayerMessageRead(INetPlayer player, BMSByte buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,51 @@ public class BMSBytePool
private const int APPROX_SIZE_ZONE = 128;
private readonly List<BMSByte> _availableBuffers = new List<BMSByte>();
private readonly List<BMSByte> _inUseBuffers = new List<BMSByte>();
private System.Object syncLock = new object();

public BMSByte Get(int size)
{
if (_availableBuffers.Count == 0)
return CreateNewBuffer(size);
else
BMSByte buff;

lock (syncLock)
{
BMSByte buff = GetAvailableBuffer(size);
if (buff.byteArr.Length < size)
buff.SetArraySize(size);
return buff;
if (_availableBuffers.Count == 0)
buff = CreateNewBuffer(size);
else
{
buff = GetAvailableBuffer(size);
if (buff.byteArr.Length < size)
buff.SetArraySize(size);
}
}
return buff;
}

public void Release(BMSByte buffer)
{
// TODO: Figure out why _inUseBuffers.Remove(buffer) is returning false
bool found = false;
for (int i = 0; i < _inUseBuffers.Count; i++)

lock (syncLock)
{
if (_inUseBuffers[i] == buffer)
for (int i = 0; i < _inUseBuffers.Count; i++)
{
_inUseBuffers.RemoveAt(i);
found = true;
break;
if (_inUseBuffers[i] == buffer)
{
_inUseBuffers.RemoveAt(i);
found = true;
break;
}
}
if (found)
{
_availableBuffers.Add(buffer);
buffer.Clear();
}
}
if (!found)
throw new BMSBytePoolReleaseUnmanagedBufferException();
_availableBuffers.Add(buffer);
buffer.Clear();

}

private BMSByte GetAvailableBuffer(int size)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// <autogenerated />
using System;
using System.Reflection;
[assembly: global::System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v3.0", FrameworkDisplayName = "")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.42000
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
// </auto-generated>
//------------------------------------------------------------------------------

using System;
using System.Reflection;

[assembly: System.Reflection.AssemblyCompanyAttribute("ForgeNatHolePunch")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Release")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0")]
[assembly: System.Reflection.AssemblyProductAttribute("ForgeNatHolePunch")]
[assembly: System.Reflection.AssemblyTitleAttribute("ForgeNatHolePunch")]
[assembly: System.Reflection.AssemblyVersionAttribute("1.0.0.0")]

// Generated by the MSBuild WriteCodeFragment class.

Loading