TeaNet/Peers/ConnectionManager.cs

404 lines
18 KiB
C#

using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Net;
using System.Threading;
using System;
using NeonTea.Quakeball.TeaNet.Packets;
using UnityEngine;
namespace NeonTea.Quakeball.TeaNet.Peers {
/// <summary>Manages connections for Peer, sends them keepalives and sends and handles incoming messages.</summary>
public class ConnectionManager {
private ulong ConnectionCounter;
private Dictionary<ulong, Connection> Connections = new Dictionary<ulong, Connection>();
private Dictionary<IPEndPoint, ulong> IPtoID = new Dictionary<IPEndPoint, ulong>();
private Dictionary<ulong, ConcurrentQueue<Packet>> PacketQueue = new Dictionary<ulong, ConcurrentQueue<Packet>>();
private Peer Peer;
public Dictionary<ulong, Queue<ProtocolAction>> ProtocolActionQueues = new Dictionary<ulong, Queue<ProtocolAction>>();
private Queue<Connection> ConnectionsToRemove = new Queue<Connection>();
private Thread UpdateThread;
public long Timeout = 8000;
public long Interval = 100;
public ConnectionManagerTrafficData TrafficData = new ConnectionManagerTrafficData();
public ConnectionManager(Peer peer) {
Peer = peer;
UpdateThread = new Thread(new ThreadStart(UpdateThreadMethod));
UpdateThread.Start();
}
public void StopThread() {
UpdateThread.Abort();
}
/// <summary>Find a given Connection. Should not be used, unless expecting to establish a connection with the endpoint.</summary>
public Connection Find(IPEndPoint endpoint) {
if (IPtoID.ContainsKey(endpoint)) {
return Connections[IPtoID[endpoint]];
}
Connection conn = new Connection(endpoint, ConnectionStatus.Awaiting);
AddConnection(conn);
return conn;
}
/// <summary>Start establishing a connection to a given endpoint with the given protocol</summary>
public bool StartConnection(IPEndPoint endpoint, byte protocolIdent) {
if (IPtoID.ContainsKey(endpoint)) {
return false;
}
Connection conn = new Connection(endpoint);
conn.Internal.AssignedProtocol = protocolIdent;
AddConnection(conn);
return true;
}
/// <summary>Get the connection instance from the given uid, if such exists. Null otherwise.</summary>
public Connection GetConnection(ulong uid) {
Connection conn;
Connections.TryGetValue(uid, out conn);
return conn;
}
/// <summary>Soft-closes the connection with the given uid, meaning it will wait for them to acknowledge the closing</summary>
public void CloseConnection(ulong uid, ClosingReason reason) {
if (Connections.ContainsKey(uid)) {
Connections[uid].ClosingReason = reason;
Connections[uid].Status = ConnectionStatus.Rejected;
}
}
/// <summary>Add a reliable packet to the packet queue, to be sent on the next update, or when SendPacketQueue is called.</summary>
public void AddPacketToQueue(ulong uid, Packet p) {
if (!Connections.ContainsKey(uid)) {
return;
}
p = p.ShallowCopy();
p.PacketId = Connections[uid].Internal.ReliablePacketIDCounter++;
PacketQueue[uid].Enqueue(p);
}
/// <summary>Send the current packet queue instantly.</summary>
public void SendPacketQueue(ulong uid) {
if (!Connections.ContainsKey(uid)) {
return;
}
Connection conn = Connections[uid];
Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol);
if (protocol != null && conn.IsReady()) {
Packet[] list = PacketQueue[uid].ToArray();
int firstId = Int32.MaxValue;
if (list.Length > 0) {
firstId = list[0].PacketId;
}
ByteBuffer buffer = protocol.BuildMessage(conn, firstId, true);
buffer.Write(list.Length);
foreach (Packet p in list) {
buffer.WritePacket(protocol, p);
// Do the analytics dance!
PerPacketData OldSentByPacket;
TrafficData.SentByPacket.TryGetValue(p.GetType(), out OldSentByPacket);
OldSentByPacket.Bytes += p.Size;
OldSentByPacket.Packets += 1;
TrafficData.SentByPacket[p.GetType()] = OldSentByPacket;
TrafficData.ReliablePacketsSent++;
}
Send(conn, buffer);
}
}
/// <summary>Send a single unreliable packet.</summary>
public void SendSingleUnreliable(ulong uid, Packet p) {
if (!Connections.ContainsKey(uid)) {
return;
}
Connection conn = Connections[uid];
p = p.ShallowCopy();
p.PacketId = conn.Internal.UnreliablePacketIDCounter++;
p.PacketIsReliable = false;
Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol);
if (protocol != null && conn.IsReady()) {
ByteBuffer buffer = protocol.BuildMessage(conn, p.PacketId, false);
buffer.Write(1);
buffer.WritePacket(protocol, p);
Send(conn, buffer);
TrafficData.UnreliablePacketsSent++;
// Do the analytics dance!
PerPacketData OldSentByPacket;
TrafficData.SentByPacket.TryGetValue(p.GetType(), out OldSentByPacket);
OldSentByPacket.Bytes += p.Size;
OldSentByPacket.Packets += 1;
TrafficData.SentByPacket[p.GetType()] = OldSentByPacket;
}
}
/// <summary>Go through queue of networking actions that have happened since last update.</summary>
public void Update() {
foreach (byte id in ProtocolActionQueues.Keys) {
Protocol protocol = Peer.GetProtocol(id);
while (ProtocolActionQueues[id].Count > 0) {
ProtocolAction action = ProtocolActionQueues[id].Dequeue();
if (action is ReceiveAction) {
ReceiveAction receive = (ReceiveAction)action;
protocol.Receive(receive.Connection, receive.Packet);
} else if (action is ConnectionChangedAction) {
ConnectionChangedAction changed = (ConnectionChangedAction)action;
protocol.ConnectionStatusChanged(changed.OldStatus, changed.NewStatus, changed.Connection);
} else if (action is TimeoutAction) {
TimeoutAction changed = (TimeoutAction)action;
protocol.Timeout(changed.Connection);
}
}
}
while (ConnectionsToRemove.Count > 0) {
Connection conn = ConnectionsToRemove.Dequeue();
RemoveConnection(conn);
}
}
private void AddConnection(Connection conn) {
conn.uid = ConnectionCounter++;
Connections.Add(conn.uid, conn);
IPtoID.Add(conn.Endpoint, conn.uid);
PacketQueue.Add(conn.uid, new ConcurrentQueue<Packet>());
}
private void RemoveConnection(Connection conn) {
Connections.Remove(conn.uid);
IPtoID.Remove(conn.Endpoint);
PacketQueue.Remove(conn.uid);
}
private void SendPlain(Connection conn) {
Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol);
if (protocol != null) {
ByteBuffer buffer = protocol.BuildMessage(conn, Int32.MaxValue, false);
Send(conn, buffer);
}
}
private void Send(Connection conn, ByteBuffer buffer) {
if (conn.Status == ConnectionStatus.Lost) {
return;
}
TrafficData.BytesSent += buffer.Size;
TrafficData.TotalMessagesSent++;
byte[] bytes = buffer.Pack();
Peer.ListenerThread.LastSentConnection = conn;
Peer.UdpClient.Send(bytes, bytes.Length, conn.Endpoint);
}
public void Handle(IPEndPoint endpoint, ByteBuffer buffer) {
Connection conn = Find(endpoint);
conn.Internal.LastMessage = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
ConnectionStatus oldStatus = conn.Status;
byte protocolId = buffer.Read();
Protocol protocol = Peer.GetProtocol(protocolId);
PacketStage stage = buffer.ReadStage();
switch (stage) {
case PacketStage.Establishing:
if (conn.Status == ConnectionStatus.Awaiting) {
conn.Internal.AssignedProtocol = protocolId;
string version = buffer.ReadString();
if (protocol == null || !version.Equals(protocol.Version)) {
conn.Status = ConnectionStatus.Rejected;
conn.ClosingReason = ClosingReason.IncorrectVersion;
} else {
conn.Status = ConnectionStatus.Ready;
}
if (protocol != null) {
ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn));
}
}
break;
case PacketStage.Rejected:
conn.Status = ConnectionStatus.Closed;
conn.ClosingReason = buffer.ReadClosingReason();
if (protocol != null) {
ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn));
}
break;
case PacketStage.Closed:
if (conn.Status == ConnectionStatus.Stopped) {
break;
}
conn.Status = ConnectionStatus.Stopped;
if (protocol != null) {
ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn));
}
break;
case PacketStage.Ready:
if (conn.Internal.AssignedProtocol != protocolId || protocol == null) {
break;
}
if (oldStatus == ConnectionStatus.Establishing) { // Update connection status
conn.Status = ConnectionStatus.Ready;
ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn));
}
if (!(oldStatus == ConnectionStatus.Establishing || oldStatus == ConnectionStatus.Ready)) {
break; // No cheating at this table! For realsies this time!
}
conn.Internal.LatestOutwardReliable = buffer.ReadInt();
int FirstPacketId = buffer.ReadInt();
bool Reliable = buffer.ReadBool();
ConcurrentQueue<Packet> queue = PacketQueue[conn.uid];
Packet peeked;
while (queue.TryPeek(out peeked)) {
if (peeked.PacketId > conn.Internal.LatestOutwardReliable) {
break;
} else {
Packet LastRemoved;
queue.TryDequeue(out LastRemoved);
}
}
PacketQueue[conn.uid] = queue;
int PacketAmount = buffer.ReadInt();
if (Reliable) {
TrafficData.ReliablePacketsReceived += PacketAmount;
} else {
TrafficData.UnreliablePacketsreceived += PacketAmount;
}
for (int i = 0; i < PacketAmount; i++) {
Packet p = buffer.ReadPacket(protocol);
p.PacketId = FirstPacketId + i;
p.PacketIsReliable = Reliable;
if (p.PacketIsReliable) {
if (p.PacketId > conn.Internal.LatestInwardReliable) {
conn.Internal.LatestInwardReliable = p.PacketId;
ProtocolActionQueues[protocol.Identifier].Enqueue(new ReceiveAction(conn, p));
}
} else if (p.PacketId > conn.Internal.LatestInwardUnreliable) {
conn.Internal.LatestInwardUnreliable = p.PacketId;
ProtocolActionQueues[protocol.Identifier].Enqueue(new ReceiveAction(conn, p));
}
// Do some analytics!
PerPacketData OldReceivedByPacket;
TrafficData.ReceivedByPacket.TryGetValue(p.GetType(), out OldReceivedByPacket);
OldReceivedByPacket.Bytes += p.Size;
OldReceivedByPacket.Packets += 1;
TrafficData.ReceivedByPacket[p.GetType()] = OldReceivedByPacket;
}
break;
}
}
private void UpdateThreadMethod() {
try {
while (Thread.CurrentThread.IsAlive) {
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
List<ulong> timedOut = new List<ulong>();
foreach (ulong uid in Connections.Keys) {
Connection conn = Connections[uid];
if ((now - conn.Internal.LastMessage) > Timeout || conn.Status == ConnectionStatus.Lost) {
timedOut.Add(uid);
}
if (conn.Status != ConnectionStatus.Awaiting || conn.Status != ConnectionStatus.Stopped) {
if (conn.Status == ConnectionStatus.Ready) {
SendPacketQueue(uid);
} else {
SendPlain(conn);
}
}
}
foreach (ulong uid in timedOut) {
Connection conn = Connections[uid];
ConnectionsToRemove.Enqueue(conn);
if (conn.Status == ConnectionStatus.Ready
|| conn.Status == ConnectionStatus.Establishing
|| conn.Status == ConnectionStatus.Awaiting
|| conn.Status == ConnectionStatus.Lost) {
Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol);
if (protocol != null) {
conn.ClosingReason = ClosingReason.Timeout;
ProtocolActionQueues[conn.Internal.AssignedProtocol].Enqueue(new TimeoutAction(conn));
}
}
}
Thread.Sleep((int)Interval);
}
} catch (ThreadAbortException) {
Peer.MessageListener.Message("Connection Thread Stopped");
}
}
}
public abstract class ProtocolAction { };
class ReceiveAction : ProtocolAction {
public Connection Connection;
public Packet Packet;
public ReceiveAction(Connection connection, Packet packet) {
Connection = connection;
Packet = packet;
}
}
class ConnectionChangedAction : ProtocolAction {
public ConnectionStatus OldStatus;
public ConnectionStatus NewStatus;
public Connection Connection;
public ConnectionChangedAction(ConnectionStatus old, ConnectionStatus newstatus, Connection connection) {
Connection = connection;
OldStatus = old;
NewStatus = newstatus;
}
}
class TimeoutAction : ProtocolAction {
public Connection Connection;
public TimeoutAction(Connection connection) {
Connection = connection;
}
}
public class ConnectionManagerTrafficData {
/// <summary>The amount of bytes sent since the last clear interval from Peer</summary>
public int BytesSent;
/// <summary>The amount of total IP messages sent since the last clear interval from Peer</summary>
public int TotalMessagesSent;
/// <summary>The amount of reliable packets sent since the last clear interval from Peer</summary>
public int ReliablePacketsSent;
/// <summary>The amount of unreliable packets sent since the last clear interval from Peer</summary>
public int UnreliablePacketsSent;
/// <summary>The amount of reliable packets received since the last clear interval from Peer</summary>
public int ReliablePacketsReceived;
/// <summary>The amount of unreliable packets received since the last clear interval from Peer</summary>
public int UnreliablePacketsreceived;
/// <summary>Data relating to outward packet specific traffic</summary>
public ConcurrentDictionary<Type, PerPacketData> SentByPacket = new ConcurrentDictionary<Type, PerPacketData>();
/// <summary>Data relating to inward packet specific traffic</summary>
public ConcurrentDictionary<Type, PerPacketData> ReceivedByPacket = new ConcurrentDictionary<Type, PerPacketData>();
public void Clear() {
BytesSent = 0;
TotalMessagesSent = 0;
ReliablePacketsSent = 0;
UnreliablePacketsSent = 0;
ReliablePacketsReceived = 0;
UnreliablePacketsreceived = 0;
}
}
public struct PerPacketData {
public int Bytes;
public int Packets;
}
}