using UnityEngine; using System.Collections.Generic; using System.Net; using System.Threading; using System; using NeonTea.Quakeball.TeaNet.Packets; namespace NeonTea.Quakeball.TeaNet.Peers { public class ConnectionManager { private Dictionary Connections = new Dictionary(); private Dictionary> PacketQueue = new Dictionary>(); private Peer Peer; private Thread UpdateThread; public long Timeout = 8000; public long Interval = 100; public ConnectionManager(Peer peer) { Peer = peer; UpdateThread = new Thread(new ThreadStart(UpdateThreadMethod)); UpdateThread.Start(); } public void StopThread() { UpdateThread.Abort(); } public Connection Find(IPEndPoint endpoint) { if (Connections.ContainsKey(endpoint)) { return Connections[endpoint]; } Connection conn = new Connection(endpoint, ConnectionStatus.Awaiting); Connections.Add(endpoint, conn); PacketQueue.Add(conn, new List()); return conn; } public bool StartConnection(IPEndPoint endpoint, byte protocolIdent) { if (Connections.ContainsKey(endpoint)) { return false; } Connection conn = new Connection(endpoint); conn.AssignedProtocol = protocolIdent; Connections.Add(endpoint, conn); PacketQueue.Add(conn, new List()); return true; } public void AddPacketToQueue(Connection conn, Packet p) { p.Id = conn.ReliablePacketIDCounter++; PacketQueue[conn].Add(p); } public void SendPacketQueue(Connection conn) { Protocol protocol = Peer.GetProtocol(conn.AssignedProtocol); if (protocol != null) { ByteBuffer buffer = protocol.BuildMessage(conn); List list = PacketQueue[conn]; buffer.WriteInt(list.Count); foreach (Packet p in list) { buffer.WritePacket(protocol, p); } Send(conn, buffer); } } public void SendSingleUnreliable(Connection conn, Packet p) { p.Id = conn.UnreliablePacketIDCounter++; p.Reliable = false; Protocol protocol = Peer.GetProtocol(conn.AssignedProtocol); if (protocol != null) { ByteBuffer buffer = protocol.BuildMessage(conn); buffer.WriteInt(1); buffer.WritePacket(protocol, p); Send(conn, buffer); } } private void SendPlain(Connection conn) { Protocol protocol = Peer.GetProtocol(conn.AssignedProtocol); if (protocol != null) { ByteBuffer buffer = protocol.BuildMessage(conn); Send(conn, buffer); } } private void Send(Connection conn, ByteBuffer buffer) { if (conn.Status == ConnectionStatus.Lost) { return; } byte[] bytes = buffer.Pack(); Peer.ListenerThread.LastSentConnection = conn; Peer.UdpClient.Send(bytes, bytes.Length, conn.Endpoint); } public void Handle(IPEndPoint endpoint, ByteBuffer buffer) { Connection conn = Find(endpoint); conn.LastMessage = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); ConnectionStatus oldStatus = conn.Status; byte protocolId = buffer.Read(); Protocol protocol = Peer.GetProtocol(protocolId); PacketStage stage = buffer.ReadStage(); switch (stage) { case PacketStage.Establishing: if (conn.Status == ConnectionStatus.Awaiting) { conn.AssignedProtocol = protocolId; string version = buffer.ReadString(); if (protocol == null || !version.Equals(protocol.Version)) { conn.Status = ConnectionStatus.Rejected; conn.ClosingReason = ClosingReason.IncorrectVersion; } else { conn.Status = ConnectionStatus.Ready; } if (protocol != null) { protocol.ConnectionStatusChanged(oldStatus, conn.Status, conn); } } break; case PacketStage.Rejected: conn.Status = ConnectionStatus.Closed; conn.ClosingReason = buffer.ReadClosingReason(); if (protocol != null) { protocol.ConnectionStatusChanged(oldStatus, conn.Status, conn); } break; case PacketStage.Closed: if (conn.Status == ConnectionStatus.Stopped) { break; } conn.Status = ConnectionStatus.Stopped; if (protocol != null) { protocol.ConnectionStatusChanged(oldStatus, conn.Status, conn); } break; case PacketStage.Ready: if (conn.AssignedProtocol != protocolId || protocol == null) { break; } if (oldStatus == ConnectionStatus.Establishing) { // Update connection status conn.Status = ConnectionStatus.Ready; protocol.ConnectionStatusChanged(oldStatus, conn.Status, conn); } conn.LatestOutwardReliable = buffer.ReadInt(); List list = PacketQueue[conn]; list.RemoveAll(p => p.Id <= conn.LatestOutwardReliable); PacketQueue[conn] = list; int PacketAmount = buffer.ReadInt(); for (int i = 0; i < PacketAmount; i++) { Packet p = buffer.ReadPacket(protocol); if (p.Reliable) { if (p.Id > conn.LatestInwardReliable) { conn.LatestInwardReliable = p.Id; protocol.Receive(conn, p); } } else if (p.Id > conn.LatestInwardUnreliable) { conn.LatestInwardUnreliable = p.Id; protocol.Receive(conn, p); } } break; } } private void UpdateThreadMethod() { try { while (Thread.CurrentThread.IsAlive) { long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); List> timedOut = new List>(); foreach (KeyValuePair pair in Connections) { Connection conn = pair.Value; if ((now - conn.LastMessage) > Timeout || conn.Status == ConnectionStatus.Lost) { timedOut.Add(pair); } if (conn.Status != ConnectionStatus.Awaiting || conn.Status != ConnectionStatus.Stopped) { if (conn.Status == ConnectionStatus.Ready) { SendPacketQueue(conn); } else { SendPlain(conn); } } } foreach (KeyValuePair pair in timedOut) { Connections.Remove(pair.Key); PacketQueue.Remove(pair.Value); if (pair.Value.Status == ConnectionStatus.Ready || pair.Value.Status == ConnectionStatus.Establishing || pair.Value.Status == ConnectionStatus.Awaiting || pair.Value.Status == ConnectionStatus.Lost) { Protocol protocol = Peer.GetProtocol(pair.Value.AssignedProtocol); if (protocol != null) { protocol.Timeout(pair.Value); } } } Thread.Sleep((int)Interval); } } catch (ThreadAbortException) { Peer.MessageListener.Message("Connection Thread Stopped"); } } } }