From 624face1bafee666127cb44e1c65996d7317e7f8 Mon Sep 17 00:00:00 2001 From: teascade Date: Wed, 30 Sep 2020 16:07:53 +0300 Subject: [PATCH] Separate into own repository --- .gitignore | 1 + Packets/ByteBuffer.cs | 235 +++++++++++++++++++++ Packets/Packet.cs | 42 ++++ Packets/Protocol.cs | 71 +++++++ Peers/Connection.cs | 83 ++++++++ Peers/ConnectionManager.cs | 403 +++++++++++++++++++++++++++++++++++++ Peers/ListenerThread.cs | 79 ++++++++ Peers/Peer.cs | 235 +++++++++++++++++++++ README.md | 13 ++ 9 files changed, 1162 insertions(+) create mode 100644 .gitignore create mode 100644 Packets/ByteBuffer.cs create mode 100644 Packets/Packet.cs create mode 100644 Packets/Protocol.cs create mode 100644 Peers/Connection.cs create mode 100644 Peers/ConnectionManager.cs create mode 100644 Peers/ListenerThread.cs create mode 100644 Peers/Peer.cs create mode 100644 README.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f867683 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.meta \ No newline at end of file diff --git a/Packets/ByteBuffer.cs b/Packets/ByteBuffer.cs new file mode 100644 index 0000000..fceb2c5 --- /dev/null +++ b/Packets/ByteBuffer.cs @@ -0,0 +1,235 @@ +using System.Collections.Generic; +using System; +using System.Text; + +namespace NeonTea.Quakeball.TeaNet.Packets { + /// Contains a stream of bytes for sending or receiving over the internet. + public class ByteBuffer { + public int Size => Bytes.Count; + + private List Bytes; + private int pos = 0; + + /// Creates a new empty ByteBuffer + public ByteBuffer() { + Bytes = new List(); + } + + /// Creates a new ByteBuffer from the given byte[] + public ByteBuffer(byte[] bytes) { + Bytes = new List(bytes); + } + + /// Packs the byte buffer in to a byte[] array. + public byte[] Pack() { + return Bytes.ToArray(); + } + + /// is there a byte to read next? + public bool CanRead() { + return pos < Bytes.Count; + } + + /// Reads a given object with Serializable implemented. Assumes there is one next. + public void ReadSerializable(Serializable s) { + s.Read(this); + } + + /// Read a char on the buffer. Assumes there is one next. + public char ReadChar() { + return BitConverter.ToChar(Read(2), 0); + } + + /// Read a boolean on the buffer. Assumes there is one next. + public bool ReadBool() { + return Read() == 1; + } + + /// Read a double on the buffer. Assumes there is one next. + public double ReadDouble() { + return BitConverter.ToDouble(Read(8), 0); + } + + /// Read a float on the buffer. Assumes there is one next. + public float ReadFloat() { + return BitConverter.ToSingle(Read(4), 0); + } + + /// Read an unsigned long on the buffer. Assumes there is one next. + public ulong ReadULong() { + return BitConverter.ToUInt64(Read(8), 0); + } + + /// Read an unsigned integer on the buffer. Assumes there is one next. + public uint ReadUInt() { + return BitConverter.ToUInt32(Read(4), 0); + } + + /// Read an unsigned short on the buffer. Assumes there is one next. + public ushort ReadUShort() { + return BitConverter.ToUInt16(Read(2), 0); + } + + /// Read a long on the buffer. Assumes there is one next. + public long ReadLong() { + return BitConverter.ToInt64(Read(8), 0); + } + + /// Read an integer on the buffer. Assumes there is one next. + public int ReadInt() { + return BitConverter.ToInt32(Read(4), 0); + } + + /// Read a short on the buffer. Assumes there is one next. + public short ReadShort() { + return BitConverter.ToInt16(Read(2), 0); + } + + /// Read a string on the buffer. Assumes there is one next. + public string ReadString() { + int length = ReadInt(); + string s = Encoding.UTF8.GetString(Read(length)); + return s; + } + + /// Read an integer on the buffer. Assumes there is one next. + public byte[] Read(int amount) { + byte[] bytes = Bytes.GetRange(pos, amount).ToArray(); + pos += amount; + return bytes; + } + + /// Read the next byte on the buffer. Assumes there is one next. + public byte Read() { + return Bytes[pos++]; + } + + /// Write something that implements Serializable. + public void Write(Serializable s) { + s.Write(this); + } + + /// Write a char to the buffer. + public void Write(char c) { + Bytes.AddRange(BitConverter.GetBytes(c)); + } + + /// Write a boolean to the buffer. + public void Write(bool b) { + Write(b ? (byte)0b1 : (byte)0b0); + } + + /// Write a double to the buffer. + public void Write(double d) { + Bytes.AddRange(BitConverter.GetBytes(d)); + } + + /// Write a float to the buffer. + public void Write(float f) { + Bytes.AddRange(BitConverter.GetBytes(f)); + } + + /// Write an unsigned long to the buffer. + public void Write(ulong l) { + Bytes.AddRange(BitConverter.GetBytes(l)); + } + + /// Write an unsigned integer to the buffer. + public void Write(uint i) { + Bytes.AddRange(BitConverter.GetBytes(i)); + } + + /// Write an unsigned short to the buffer. + public void Write(ushort s) { + Bytes.AddRange(BitConverter.GetBytes(s)); + } + + /// Write a long to the buffer. + public void Write(long l) { + Bytes.AddRange(BitConverter.GetBytes(l)); + } + + /// Write an integer to the buffer. + public void Write(int i) { + Bytes.AddRange(BitConverter.GetBytes(i)); + } + + /// Write a short to the buffer. + public void Write(short s) { + Bytes.AddRange(BitConverter.GetBytes(s)); + } + + /// Write a string to the buffer. + public void Write(string s) { + byte[] bytes = Encoding.UTF8.GetBytes(s); + Write(bytes.Length); + Bytes.AddRange(bytes); + } + + /// Write a byte to the buffer. + public void Write(byte b) { + Bytes.Add(b); + } + + /// Read weather the given fingerprint is next on the buffer. + public bool ReadFingerprint(byte[] fingerprint) { + foreach (byte b in fingerprint) { + if (!(CanRead() && Read() == b)) { + return false; + } + } + return true; + } + + public PacketStage ReadStage() { + PacketStage stage = PacketStage.Closed; + switch (Read()) { + case 0: + stage = PacketStage.Establishing; + break; + case 1: + stage = PacketStage.Rejected; + break; + case 2: + stage = PacketStage.Closed; + break; + case 3: + stage = PacketStage.Ready; + break; + } + return stage; + } + + public ClosingReason ReadClosingReason() { + ClosingReason reason = ClosingReason.Unknown; + switch (Read()) { + case 0: + reason = ClosingReason.Unknown; + break; + case 1: + reason = ClosingReason.IncorrectVersion; + break; + } + return reason; + } + + /// Write an entire packet using the protocol to the buffer. + public void WritePacket(Protocol protocol, Packet p) { + int old = Bytes.Count; + Write(protocol.GetPacketTypeID(p)); + p.Write(this); + p.Size = Bytes.Count - old; + } + + /// Read an entire packet using the given protocol from the buffer. + public Packet ReadPacket(Protocol protocol) { + int old = pos; + int packetType = ReadInt(); + Type t = protocol.GetPacketType(packetType); + Packet p = (Packet)Activator.CreateInstance(t); + p.Read(this); + p.Size = pos - old; + return p; + } + } +} \ No newline at end of file diff --git a/Packets/Packet.cs b/Packets/Packet.cs new file mode 100644 index 0000000..5d3585c --- /dev/null +++ b/Packets/Packet.cs @@ -0,0 +1,42 @@ +namespace NeonTea.Quakeball.TeaNet.Packets { + /// A packet for sending stuff over to connections. + public abstract class Packet { + /// Packet meta-information: Is this packet reliable. Set just before sending. + public bool PacketIsReliable = true; + /// Packet meta-information: Id of this packet. Set just before sending. + public int PacketId; + /// Size of this packet in bytes. Only available after the packet has been Read (when received) or Written (when sent). + public int Size; + + /// Write any relevant information about this packet into the buffer. + public abstract void Write(ByteBuffer buffer); + /// Read and assign any relevant information about this packet from the buffer. + public abstract void Read(ByteBuffer buffer); + + /// Make a shallow copy for this packet, copying any primitives but retaining any references to instances. + public Packet ShallowCopy() { + return (Packet)this.MemberwiseClone(); + } + } + + /// Defines something as writeable/readable by the buffer. Useful for creating abstractions within packets. + public interface Serializable { + void Write(ByteBuffer buffer); + void Read(ByteBuffer buffer); + } + + public enum PacketStage { + Establishing = 0, + Rejected = 1, + Closed = 2, + Ready = 3, + } + + public enum ClosingReason { + Unknown = 0, + IncorrectVersion = 1, + Timeout = 2, + Manual = 3, + } + +} \ No newline at end of file diff --git a/Packets/Protocol.cs b/Packets/Protocol.cs new file mode 100644 index 0000000..d4c3f1b --- /dev/null +++ b/Packets/Protocol.cs @@ -0,0 +1,71 @@ +using System.Collections.Generic; +using System; + +using NeonTea.Quakeball.TeaNet.Peers; + + +namespace NeonTea.Quakeball.TeaNet.Packets { + /// Manages a single form of conversation between clients for the Peer. Don't forget to register your packets with + public abstract class Protocol { + private Dictionary PacketToId = new Dictionary(); + private Dictionary IdToPacket = new Dictionary(); + private int PacketIdCounter; + + /// Refers to the peer it is registered to + public Peer Peer; + + /// Unique identifier for the protocol. This should be different for every protocol. + public abstract byte Identifier { get; } + /// Version of the protocol, should be changed if existing packets are changed, new packets are registered or old packets are removed. + public abstract string Version { get; } + + /// Called when the Peer receives a packet from a connection that uses this protocol + public abstract void Receive(Connection conn, Packet packet); + /// Called when a ConnectionStatus is changed for a connection that uses this protocol + public abstract void ConnectionStatusChanged(ConnectionStatus oldStatus, ConnectionStatus newStatus, Connection conn); + /// Called when a connection that uses this protocol is timed out suddenly. + public abstract void Timeout(Connection conn); + + /// Register a packet for sending and receiving. + public int RegisterPacket(Type t) { + if (t.BaseType != typeof(Packet) || PacketToId.ContainsKey(t)) { + return -1; + } + int id = PacketIdCounter++; + PacketToId.Add(t, id); + IdToPacket.Add(id, t); + return id; + } + + public ByteBuffer BuildMessage(Connection connection, int firstId, bool reliable) { + ByteBuffer buffer = new ByteBuffer(); + foreach (byte b in Peer.Fingerprint) { + buffer.Write(b); + } + buffer.Write(Identifier); + if (connection.Status == ConnectionStatus.Establishing) { + buffer.Write((byte)PacketStage.Establishing); + buffer.Write(Version); + } else if (connection.Status == ConnectionStatus.Closed) { + buffer.Write((byte)PacketStage.Closed); + } else if (connection.Status == ConnectionStatus.Rejected) { + buffer.Write((byte)PacketStage.Rejected); + buffer.Write((byte)connection.ClosingReason); + } else if (connection.Status == ConnectionStatus.Ready) { + buffer.Write((byte)PacketStage.Ready); + buffer.Write(connection.Internal.LatestInwardReliable); + buffer.Write(firstId); + buffer.Write(reliable); + } + return buffer; + } + + public int GetPacketTypeID(Packet packet) { + return PacketToId[packet.GetType()]; + } + + public Type GetPacketType(int id) { + return IdToPacket[id]; + } + } +} \ No newline at end of file diff --git a/Peers/Connection.cs b/Peers/Connection.cs new file mode 100644 index 0000000..a94b787 --- /dev/null +++ b/Peers/Connection.cs @@ -0,0 +1,83 @@ +using System.Net; +using System; + +using NeonTea.Quakeball.TeaNet.Packets; + +namespace NeonTea.Quakeball.TeaNet.Peers { + /// Represents a connection to a remot host over the internet. + public class Connection { + + /// The IP end point of the connection + public IPEndPoint Endpoint; + /// The unique identifier of the connection. + public ulong uid; + /// Connection status of the current connection. + public ConnectionStatus Status; + /// Reason why the connection closed. Null if no reason. + public ClosingReason ClosingReason; + + /// Internal data for the connection. Do not touch, unless you know what you're doing. + public ConnectionInternalData Internal = new ConnectionInternalData(); + + public Connection(IPEndPoint endpoint, ConnectionStatus status = ConnectionStatus.Establishing) { + Endpoint = endpoint; + Status = status; + + Internal.LastMessage = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + Internal.LatestInwardReliable = -1; + Internal.LatestInwardUnreliable = -1; + Internal.LatestOutwardReliable = -1; + Internal.LatestOutwardUnreliable = -1; + } + + /// Is the connection ready and established for sending packets. + public bool IsReady() { + return Status == ConnectionStatus.Ready; + } + + /// Is the connection disconnected. Shorthand for weather Status is Rejected, Closed, Stopped or Lost. + public bool IsDisconnected() { + return !(Status == ConnectionStatus.Ready + || Status == ConnectionStatus.Awaiting + || Status == ConnectionStatus.Establishing); + } + } + + public struct ConnectionInternalData { + /// The protocol identifier, which this connection uses. + public byte AssignedProtocol; + /// Last unix timestamp in milliseconds, when this connection was last heard of. + public long LastMessage; + /// Last reliable Packet ID the connection has told us they have + public int LatestOutwardReliable; + /// Last unreliablePacket ID the connection has told us they have + public int LatestOutwardUnreliable; + /// Last reliable Packet ID we've received from the connection + public int LatestInwardReliable; + /// Last unreliable Packet ID we've received from the connection + public int LatestInwardUnreliable; + + /// Reliable Packet ID counter for packets we're sending them + public int ReliablePacketIDCounter; + /// Unreliable Packet ID counter for packets we're sending them + public int UnreliablePacketIDCounter; + } + + /// Initiali + public enum ConnectionStatus { + /// Awaiting the other endpoint to establish the connection. + Awaiting, + /// Attempting to establish the connection. + Establishing, + /// Ready for packet sending + Ready, + /// Rejected connection at endpoint, sending information that it was rejected. + Rejected, + /// Closed the endpoint, and informing the connection that it should stop. + Closed, + /// Connection is stopped and waiting for timeout. + Stopped, + /// Connection Lost + Lost, + } +} diff --git a/Peers/ConnectionManager.cs b/Peers/ConnectionManager.cs new file mode 100644 index 0000000..dc980dd --- /dev/null +++ b/Peers/ConnectionManager.cs @@ -0,0 +1,403 @@ +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Net; +using System.Threading; +using System; +using NeonTea.Quakeball.TeaNet.Packets; +using UnityEngine; + +namespace NeonTea.Quakeball.TeaNet.Peers { + /// Manages connections for Peer, sends them keepalives and sends and handles incoming messages. + public class ConnectionManager { + private ulong ConnectionCounter; + private Dictionary Connections = new Dictionary(); + private Dictionary IPtoID = new Dictionary(); + private Dictionary> PacketQueue = new Dictionary>(); + private Peer Peer; + + public Dictionary> ProtocolActionQueues = new Dictionary>(); + private Queue ConnectionsToRemove = new Queue(); + + private Thread UpdateThread; + + public long Timeout = 8000; + public long Interval = 100; + + public ConnectionManagerTrafficData TrafficData = new ConnectionManagerTrafficData(); + + public ConnectionManager(Peer peer) { + Peer = peer; + UpdateThread = new Thread(new ThreadStart(UpdateThreadMethod)); + UpdateThread.Start(); + } + + public void StopThread() { + UpdateThread.Abort(); + } + + /// Find a given Connection. Should not be used, unless expecting to establish a connection with the endpoint. + public Connection Find(IPEndPoint endpoint) { + if (IPtoID.ContainsKey(endpoint)) { + return Connections[IPtoID[endpoint]]; + } + Connection conn = new Connection(endpoint, ConnectionStatus.Awaiting); + AddConnection(conn); + return conn; + } + + /// Start establishing a connection to a given endpoint with the given protocol + public bool StartConnection(IPEndPoint endpoint, byte protocolIdent) { + if (IPtoID.ContainsKey(endpoint)) { + return false; + } + Connection conn = new Connection(endpoint); + conn.Internal.AssignedProtocol = protocolIdent; + AddConnection(conn); + return true; + } + + /// Get the connection instance from the given uid, if such exists. Null otherwise. + public Connection GetConnection(ulong uid) { + Connection conn; + Connections.TryGetValue(uid, out conn); + return conn; + } + + /// Soft-closes the connection with the given uid, meaning it will wait for them to acknowledge the closing + public void CloseConnection(ulong uid, ClosingReason reason) { + if (Connections.ContainsKey(uid)) { + Connections[uid].ClosingReason = reason; + Connections[uid].Status = ConnectionStatus.Rejected; + } + } + + /// Add a reliable packet to the packet queue, to be sent on the next update, or when SendPacketQueue is called. + public void AddPacketToQueue(ulong uid, Packet p) { + if (!Connections.ContainsKey(uid)) { + return; + } + p = p.ShallowCopy(); + p.PacketId = Connections[uid].Internal.ReliablePacketIDCounter++; + PacketQueue[uid].Enqueue(p); + } + + /// Send the current packet queue instantly. + public void SendPacketQueue(ulong uid) { + if (!Connections.ContainsKey(uid)) { + return; + } + Connection conn = Connections[uid]; + Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol); + if (protocol != null && conn.IsReady()) { + Packet[] list = PacketQueue[uid].ToArray(); + int firstId = Int32.MaxValue; + if (list.Length > 0) { + firstId = list[0].PacketId; + } + ByteBuffer buffer = protocol.BuildMessage(conn, firstId, true); + buffer.Write(list.Length); + foreach (Packet p in list) { + buffer.WritePacket(protocol, p); + + // Do the analytics dance! + PerPacketData OldSentByPacket; + TrafficData.SentByPacket.TryGetValue(p.GetType(), out OldSentByPacket); + OldSentByPacket.Bytes += p.Size; + OldSentByPacket.Packets += 1; + TrafficData.SentByPacket[p.GetType()] = OldSentByPacket; + TrafficData.ReliablePacketsSent++; + } + Send(conn, buffer); + } + } + + /// Send a single unreliable packet. + public void SendSingleUnreliable(ulong uid, Packet p) { + if (!Connections.ContainsKey(uid)) { + return; + } + Connection conn = Connections[uid]; + p = p.ShallowCopy(); + p.PacketId = conn.Internal.UnreliablePacketIDCounter++; + p.PacketIsReliable = false; + Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol); + if (protocol != null && conn.IsReady()) { + ByteBuffer buffer = protocol.BuildMessage(conn, p.PacketId, false); + buffer.Write(1); + buffer.WritePacket(protocol, p); + Send(conn, buffer); + TrafficData.UnreliablePacketsSent++; + + // Do the analytics dance! + PerPacketData OldSentByPacket; + TrafficData.SentByPacket.TryGetValue(p.GetType(), out OldSentByPacket); + OldSentByPacket.Bytes += p.Size; + OldSentByPacket.Packets += 1; + TrafficData.SentByPacket[p.GetType()] = OldSentByPacket; + } + } + + /// Go through queue of networking actions that have happened since last update. + public void Update() { + foreach (byte id in ProtocolActionQueues.Keys) { + Protocol protocol = Peer.GetProtocol(id); + while (ProtocolActionQueues[id].Count > 0) { + ProtocolAction action = ProtocolActionQueues[id].Dequeue(); + if (action is ReceiveAction) { + ReceiveAction receive = (ReceiveAction)action; + protocol.Receive(receive.Connection, receive.Packet); + } else if (action is ConnectionChangedAction) { + ConnectionChangedAction changed = (ConnectionChangedAction)action; + protocol.ConnectionStatusChanged(changed.OldStatus, changed.NewStatus, changed.Connection); + } else if (action is TimeoutAction) { + TimeoutAction changed = (TimeoutAction)action; + protocol.Timeout(changed.Connection); + } + } + } + while (ConnectionsToRemove.Count > 0) { + Connection conn = ConnectionsToRemove.Dequeue(); + RemoveConnection(conn); + } + } + + private void AddConnection(Connection conn) { + conn.uid = ConnectionCounter++; + Connections.Add(conn.uid, conn); + IPtoID.Add(conn.Endpoint, conn.uid); + PacketQueue.Add(conn.uid, new ConcurrentQueue()); + } + + private void RemoveConnection(Connection conn) { + Connections.Remove(conn.uid); + IPtoID.Remove(conn.Endpoint); + PacketQueue.Remove(conn.uid); + } + + private void SendPlain(Connection conn) { + Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol); + if (protocol != null) { + ByteBuffer buffer = protocol.BuildMessage(conn, Int32.MaxValue, false); + Send(conn, buffer); + } + } + + private void Send(Connection conn, ByteBuffer buffer) { + if (conn.Status == ConnectionStatus.Lost) { + return; + } + TrafficData.BytesSent += buffer.Size; + TrafficData.TotalMessagesSent++; + byte[] bytes = buffer.Pack(); + Peer.ListenerThread.LastSentConnection = conn; + Peer.UdpClient.Send(bytes, bytes.Length, conn.Endpoint); + } + + public void Handle(IPEndPoint endpoint, ByteBuffer buffer) { + + + Connection conn = Find(endpoint); + conn.Internal.LastMessage = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + ConnectionStatus oldStatus = conn.Status; + byte protocolId = buffer.Read(); + Protocol protocol = Peer.GetProtocol(protocolId); + PacketStage stage = buffer.ReadStage(); + switch (stage) { + case PacketStage.Establishing: + if (conn.Status == ConnectionStatus.Awaiting) { + conn.Internal.AssignedProtocol = protocolId; + string version = buffer.ReadString(); + if (protocol == null || !version.Equals(protocol.Version)) { + conn.Status = ConnectionStatus.Rejected; + conn.ClosingReason = ClosingReason.IncorrectVersion; + } else { + conn.Status = ConnectionStatus.Ready; + } + if (protocol != null) { + ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn)); + } + } + break; + case PacketStage.Rejected: + conn.Status = ConnectionStatus.Closed; + conn.ClosingReason = buffer.ReadClosingReason(); + if (protocol != null) { + ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn)); + } + break; + case PacketStage.Closed: + if (conn.Status == ConnectionStatus.Stopped) { + break; + } + conn.Status = ConnectionStatus.Stopped; + if (protocol != null) { + ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn)); + } + break; + case PacketStage.Ready: + if (conn.Internal.AssignedProtocol != protocolId || protocol == null) { + break; + } + if (oldStatus == ConnectionStatus.Establishing) { // Update connection status + conn.Status = ConnectionStatus.Ready; + ProtocolActionQueues[protocol.Identifier].Enqueue(new ConnectionChangedAction(oldStatus, conn.Status, conn)); + } + if (!(oldStatus == ConnectionStatus.Establishing || oldStatus == ConnectionStatus.Ready)) { + break; // No cheating at this table! For realsies this time! + } + + conn.Internal.LatestOutwardReliable = buffer.ReadInt(); + int FirstPacketId = buffer.ReadInt(); + bool Reliable = buffer.ReadBool(); + + ConcurrentQueue queue = PacketQueue[conn.uid]; + Packet peeked; + while (queue.TryPeek(out peeked)) { + if (peeked.PacketId > conn.Internal.LatestOutwardReliable) { + break; + } else { + Packet LastRemoved; + queue.TryDequeue(out LastRemoved); + } + } + PacketQueue[conn.uid] = queue; + + + int PacketAmount = buffer.ReadInt(); + + if (Reliable) { + TrafficData.ReliablePacketsReceived += PacketAmount; + } else { + TrafficData.UnreliablePacketsreceived += PacketAmount; + } + + for (int i = 0; i < PacketAmount; i++) { + Packet p = buffer.ReadPacket(protocol); + p.PacketId = FirstPacketId + i; + p.PacketIsReliable = Reliable; + if (p.PacketIsReliable) { + if (p.PacketId > conn.Internal.LatestInwardReliable) { + conn.Internal.LatestInwardReliable = p.PacketId; + ProtocolActionQueues[protocol.Identifier].Enqueue(new ReceiveAction(conn, p)); + } + } else if (p.PacketId > conn.Internal.LatestInwardUnreliable) { + conn.Internal.LatestInwardUnreliable = p.PacketId; + ProtocolActionQueues[protocol.Identifier].Enqueue(new ReceiveAction(conn, p)); + } + + // Do some analytics! + PerPacketData OldReceivedByPacket; + TrafficData.ReceivedByPacket.TryGetValue(p.GetType(), out OldReceivedByPacket); + OldReceivedByPacket.Bytes += p.Size; + OldReceivedByPacket.Packets += 1; + TrafficData.ReceivedByPacket[p.GetType()] = OldReceivedByPacket; + } + break; + } + } + + private void UpdateThreadMethod() { + try { + while (Thread.CurrentThread.IsAlive) { + long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + List timedOut = new List(); + foreach (ulong uid in Connections.Keys) { + Connection conn = Connections[uid]; + if ((now - conn.Internal.LastMessage) > Timeout || conn.Status == ConnectionStatus.Lost) { + timedOut.Add(uid); + } + if (conn.Status != ConnectionStatus.Awaiting || conn.Status != ConnectionStatus.Stopped) { + if (conn.Status == ConnectionStatus.Ready) { + SendPacketQueue(uid); + } else { + SendPlain(conn); + } + } + } + foreach (ulong uid in timedOut) { + Connection conn = Connections[uid]; + ConnectionsToRemove.Enqueue(conn); + if (conn.Status == ConnectionStatus.Ready + || conn.Status == ConnectionStatus.Establishing + || conn.Status == ConnectionStatus.Awaiting + || conn.Status == ConnectionStatus.Lost) { + Protocol protocol = Peer.GetProtocol(conn.Internal.AssignedProtocol); + if (protocol != null) { + conn.ClosingReason = ClosingReason.Timeout; + ProtocolActionQueues[conn.Internal.AssignedProtocol].Enqueue(new TimeoutAction(conn)); + } + } + } + Thread.Sleep((int)Interval); + } + } catch (ThreadAbortException) { + Peer.MessageListener.Message("Connection Thread Stopped"); + } + } + } + + public abstract class ProtocolAction { }; + + class ReceiveAction : ProtocolAction { + public Connection Connection; + public Packet Packet; + + public ReceiveAction(Connection connection, Packet packet) { + Connection = connection; + Packet = packet; + } + } + + class ConnectionChangedAction : ProtocolAction { + public ConnectionStatus OldStatus; + public ConnectionStatus NewStatus; + public Connection Connection; + + public ConnectionChangedAction(ConnectionStatus old, ConnectionStatus newstatus, Connection connection) { + Connection = connection; + OldStatus = old; + NewStatus = newstatus; + } + } + + class TimeoutAction : ProtocolAction { + public Connection Connection; + + public TimeoutAction(Connection connection) { + Connection = connection; + } + } + + public class ConnectionManagerTrafficData { + /// The amount of bytes sent since the last clear interval from Peer + public int BytesSent; + /// The amount of total IP messages sent since the last clear interval from Peer + public int TotalMessagesSent; + /// The amount of reliable packets sent since the last clear interval from Peer + public int ReliablePacketsSent; + /// The amount of unreliable packets sent since the last clear interval from Peer + public int UnreliablePacketsSent; + /// The amount of reliable packets received since the last clear interval from Peer + public int ReliablePacketsReceived; + /// The amount of unreliable packets received since the last clear interval from Peer + public int UnreliablePacketsreceived; + /// Data relating to outward packet specific traffic + public ConcurrentDictionary SentByPacket = new ConcurrentDictionary(); + /// Data relating to inward packet specific traffic + public ConcurrentDictionary ReceivedByPacket = new ConcurrentDictionary(); + + public void Clear() { + BytesSent = 0; + TotalMessagesSent = 0; + ReliablePacketsSent = 0; + UnreliablePacketsSent = 0; + ReliablePacketsReceived = 0; + UnreliablePacketsreceived = 0; + } + } + + public struct PerPacketData { + public int Bytes; + public int Packets; + } +} diff --git a/Peers/ListenerThread.cs b/Peers/ListenerThread.cs new file mode 100644 index 0000000..2860522 --- /dev/null +++ b/Peers/ListenerThread.cs @@ -0,0 +1,79 @@ +using UnityEngine; +using System.Net; +using System; +using System.Net.Sockets; +using System.Threading; + +using NeonTea.Quakeball.TeaNet.Packets; + +namespace NeonTea.Quakeball.TeaNet.Peers { + /// Manager for the thread that listens from the given endpoint. Initiated with + public class ListenerThread { + private IPEndPoint EndPoint; + private Thread Thread; + private Peer Peer; + + public Connection LastSentConnection; + + private static int[] CONN_LOST_CODES = new int[] { 10054, 10051 }; + + /// The amount of bytes received since the last clear interval from Peer + public int BytesReceived; + /// The amount of bytes received since the last clear interval from Peer + public int MessagesReceived; + + public ListenerThread(Peer peer, IPEndPoint endpoint) { + EndPoint = endpoint; + Peer = peer; + } + + public bool Start() { + if (Thread != null) { + return false; + } + Thread t = new Thread(new ThreadStart(ListenThreadMethod)); + + t.Start(); + Thread = t; + return true; + } + + public bool Stop() { + if (Thread == null) { + return false; + } + Thread.Abort(); + return true; + } + + private void ListenThreadMethod() { + try { + while (Thread.CurrentThread.IsAlive) { + if (Peer.UdpClient.Available > 0) { + IPEndPoint Listened = new IPEndPoint(EndPoint.Address, EndPoint.Port); + ByteBuffer Buffer = new ByteBuffer(); + try { + Buffer = new ByteBuffer(Peer.UdpClient.Receive(ref Listened)); + } catch (SocketException e) { + if (Array.Exists(CONN_LOST_CODES, x => x == e.ErrorCode)) { + if (LastSentConnection != null) { + LastSentConnection.Status = ConnectionStatus.Lost; + Peer.MessageListener.Err($"Connection lost to {LastSentConnection.Endpoint}: {e.ToString()}"); + } + } else { + Peer.MessageListener.Err($"Listener error: {e.ToString()}"); + } + } + if (Buffer.ReadFingerprint(Peer.Fingerprint)) { + Peer.ConnectionManager.Handle(Listened, Buffer); + BytesReceived += Buffer.Size; + MessagesReceived++; + } + } + } + } catch (ThreadAbortException) { + Peer.MessageListener.Message("Listener Thread stopped"); + } + } + } +} \ No newline at end of file diff --git a/Peers/Peer.cs b/Peers/Peer.cs new file mode 100644 index 0000000..619992f --- /dev/null +++ b/Peers/Peer.cs @@ -0,0 +1,235 @@ +using System.Collections.Generic; +using System.Collections.Concurrent; +using System; +using System.Net; +using System.Net.Sockets; +using NeonTea.Quakeball.TeaNet.Packets; + +namespace NeonTea.Quakeball.TeaNet.Peers { + /// Main class for networking. Remember to register a protocol before using. Remember to call Update from a gameobject! + public class Peer : PeerMessageListener { + /// Underlying UdpClient. Do not touch unless you know what you are doing. + public UdpClient UdpClient { get; private set; } + + /// The fingerprint for networking. Used to make sure incoming bytes are from a correct source. + public byte[] Fingerprint { get; private set; } + /// Shorthand for ConnectionManager.Timeout: The amount of milliseconds before a connection is timed out. + public long Timeout { + get { + return ConnectionManager.Timeout; + } + set { + ConnectionManager.Timeout = value; + } + } + /// Shorthand for ConnectionManager.Interval: The interval of updates and rate of re-sending reliable messages. + public long UpdateInterval { + get { + return ConnectionManager.Interval; + } + set { + ConnectionManager.Interval = value; + } + } + /// The interval of traffic analyzed before updating. By default 5000 (5 seconds) + public long TrafficDataInterval = 5000; + /// Traffic Data for this Peer + public TrafficData TrafficData { get; private set; } = new TrafficData(); + /// Whether the Peer is currently doing anything or not.null + public bool Running { get; private set; } + + public ListenerThread ListenerThread; + public ConnectionManager ConnectionManager; + public Dictionary RegisteredProtocols = new Dictionary(); + + /// Listener for messages and errors from within the Peer. + public PeerMessageListener MessageListener; + + private long LastTrafficData; + + /// Creates a new Peer with the given fingerprint. The fingerprint can be anything, it just must be same on both peers. + public Peer(byte[] fingerprint) { + Fingerprint = fingerprint; + ConnectionManager = new ConnectionManager(this); + MessageListener = this; + LastTrafficData = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// Starts the UdpClient, but does no networking as is. + public void Start(int sending_port) { + if (Running) { + return; + } + UdpClient = new UdpClient(sending_port); + MessageListener.Message("UdpClient Started"); + Running = true; + } + + /// Abruptly stops the UdpClient and all relevant threads. + public void Stop() { + ConnectionManager.StopThread(); + if (ListenerThread != null) { + ListenerThread.Stop(); + } + UdpClient.Dispose(); + UdpClient.Close(); + Running = false; + } + + /// Start listening to a given address and port. Usually 0.0.0.0, 0 for clients and 0.0.0.0, port for servers + public void StartListen(string address, int port) { + IPEndPoint endpoint = new IPEndPoint(FindAddress(address), port); + StartListen(endpoint); + } + + private void StartListen(IPEndPoint endpoint) { + if (ListenerThread != null) { + return; // Cant listen twice + } + ListenerThread = new ListenerThread(this, endpoint); + ListenerThread.Start(); + MessageListener.Message($"Started listening to {endpoint}"); + } + + /// Connect to a remote host. Will initialize a listener to 0.0.0.0:0 if no listener is started. + public void Connect(string address, int port, byte protocolIdent, bool startListening = true) { + if (startListening) { + IPEndPoint listenEndpoint = (IPEndPoint)UdpClient.Client.LocalEndPoint; + StartListen(listenEndpoint); + } + IPEndPoint endpoint = new IPEndPoint(FindAddress(address), port); + ConnectionManager.StartConnection(endpoint, protocolIdent); + MessageListener.Message($"Connecting to {endpoint}"); + } + + /// Soft-disconnects the connection with the given uid, meaning it will wait until they acknowledge, before timing out.abstract + public void Disconnect(ulong uid) { + ConnectionManager.CloseConnection(uid, ClosingReason.Manual); + } + + /// Send a reliable packet, meaning it will reliably be delivered. + public void SendReliable(ulong uid, Packet packet) { + if (Running) { + ConnectionManager.AddPacketToQueue(uid, packet); + ConnectionManager.SendPacketQueue(uid); + } + } + + /// Add reliable packet to queue, so that it will be sent on the next update. + public void SendReliableLater(ulong uid, Packet packet) { + if (Running) { + ConnectionManager.AddPacketToQueue(uid, packet); + } + } + + /// Send an unreliable packet, meaning its delivery is not reliable. + public void SendUnreliable(ulong uid, Packet packet) { + if (Running) { + ConnectionManager.SendSingleUnreliable(uid, packet); + } + } + + /// Get a Connection instance from the given uid, if such exists. Null otherwise. + public Connection GetConnection(ulong uid) { + return ConnectionManager.GetConnection(uid); + } + + /// Register a given protocol. Returns protocol.Identifier if successful, 0 otherwise. + public byte RegisterProtocol(Protocol protocol) { + byte ident = protocol.Identifier; + if (RegisteredProtocols.ContainsKey(ident)) { + return 0; + } + RegisteredProtocols.Add(ident, protocol); + ConnectionManager.ProtocolActionQueues.Add(ident, new Queue()); + protocol.Peer = this; + return ident; + } + + /// Get protocol instance from the given identifier, if such exists. + public Protocol GetProtocol(byte ident) { + if (RegisteredProtocols.ContainsKey(ident)) { + return RegisteredProtocols[ident]; + } + return null; + } + + /// Shorthand for Peer.ConnectionManager.Update(): Handles network stuff that was received since last update. + public void Update() { + ConnectionManager.Update(); + + long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + if (now - LastTrafficData > TrafficDataInterval) { + LastTrafficData = now; + if (ListenerThread != null) { + TrafficData.Received = ListenerThread.BytesReceived; + TrafficData.TotalMessagesReceived = ListenerThread.MessagesReceived; + ListenerThread.BytesReceived = 0; + ListenerThread.MessagesReceived = 0; + } + if (ConnectionManager != null) { + TrafficData.Sent = ConnectionManager.TrafficData.BytesSent; + TrafficData.TotalMessagesSent = ConnectionManager.TrafficData.TotalMessagesSent; + TrafficData.UnreliableReceived = ConnectionManager.TrafficData.UnreliablePacketsreceived; + TrafficData.ReliableReceived = ConnectionManager.TrafficData.ReliablePacketsReceived; + TrafficData.UnreliableSent = ConnectionManager.TrafficData.UnreliablePacketsSent; + TrafficData.ReliableSent = ConnectionManager.TrafficData.ReliablePacketsSent; + + PerPacketData empty = new PerPacketData(); + foreach (Type t in ConnectionManager.TrafficData.ReceivedByPacket.Keys) { + TrafficData.ReceivedByPacket[t] = ConnectionManager.TrafficData.ReceivedByPacket[t]; + ConnectionManager.TrafficData.ReceivedByPacket[t] = empty; + } + foreach (Type t in ConnectionManager.TrafficData.SentByPacket.Keys) { + TrafficData.SentByPacket[t] = ConnectionManager.TrafficData.SentByPacket[t]; + ConnectionManager.TrafficData.SentByPacket[t] = empty; + } + + ConnectionManager.TrafficData.Clear(); + } + } + } + + public void Message(string msg) { } + public void Err(string msg) { } + + private IPAddress FindAddress(string host) { + IPAddress addr; + try { + addr = Dns.GetHostAddresses(host)[0]; + } catch (ArgumentException) { + addr = IPAddress.Parse(host); + } + return addr; + } + } + + /// Listener for messages and errors from the Peer. + public interface PeerMessageListener { + void Message(string msg); + void Err(string msg); + } + + public class TrafficData { + /// The amount of bytes received in the last TrafficDataIntervel + public int Received; + /// The amount of bytes sent in the last TrafficDataIntervel + public int Sent; + /// The amount of total messages received in the last TrafficDataIntervel + public int TotalMessagesReceived; + /// The amount of total messages sent in the last TrafficDataIntervel + public int TotalMessagesSent; + /// The amount of reliable messages received in the last TrafficDataIntervel + public int ReliableReceived; + /// The amount of reliable messages sent in the last TrafficDataIntervel + public int ReliableSent; + /// The amount of unreliable messages received in the last TrafficDataIntervel + public int UnreliableReceived; + /// The amount of unreliable messages sent in the last TrafficDataIntervel + public int UnreliableSent; + /// The amount of bytes sent in the last TrafficDataIntervel by Packet + public Dictionary SentByPacket = new Dictionary(); + /// The amount of bytes received in the last TrafficDataIntervel by Packet + public Dictionary ReceivedByPacket = new Dictionary(); + } +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..7bb239c --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# TeaNet + +TeaNet is a networking library written in **C#** using .NET Core 3.1 System.Net.Sockets + +TeaNet works with a very low-level networking API to provide a highly performant networking library +that is primarily meant for use in games, but can be used for anything else as well. + +TeaNet was originally written in [Unity][unity] for a game project called [quakeball][quakeball] by [neontea][neontea], +and while the entire TeaNet codebase is written by me, it's commit history can be found in quakeball's repository. + +[unity]: https://unity.com/ +[quakeball]: https://git.teascade.net/neontea/quakeball/ +[neontea]: https://neontea.itch.io/ \ No newline at end of file