提交 98ef42a8 编写于 作者: T tanghai

整理UNet代码

上级 95c6125a
using System.Collections.Generic;
namespace Common.Base
{
public class QueueDictionary<T, K>
{
private readonly List<T> list = new List<T>();
private readonly Dictionary<T, K> dictionary = new Dictionary<T, K>();
public void Add(T t, K k)
{
list.Add(t);
this.dictionary.Add(t, k);
}
public bool Remove(T t)
{
this.list.Remove(t);
this.dictionary.Remove(t);
return true;
}
public int Count
{
get
{
return this.list.Count;
}
}
public T FirstKey
{
get
{
return list[0];
}
}
public K this[T t]
{
get
{
return this.dictionary[t];
}
}
}
}
\ No newline at end of file
......@@ -58,6 +58,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Base\QueueDictionary.cs" />
<Compile Include="Base\TimerManager.cs" />
<Compile Include="Base\Entity.cs" />
<Compile Include="Base\Component.cs" />
......
namespace UNet
{
public static class ErrorCode
{
public const int ClientDisconnect = 1;
}
}
\ No newline at end of file
......@@ -3,7 +3,7 @@ using System.Runtime.InteropServices;
namespace UNet
{
public enum EventType
internal enum EventType
{
None = 0,
Connect = 1,
......@@ -11,7 +11,7 @@ namespace UNet
Receive = 3
}
public enum PeerState
internal enum PeerState
{
Uninitialized = -1,
Disconnected = 0,
......@@ -27,14 +27,14 @@ namespace UNet
}
[StructLayout(LayoutKind.Sequential)]
public struct ENetAddress
internal struct ENetAddress
{
public uint Host;
public ushort Port;
}
[StructLayout(LayoutKind.Sequential)]
public struct ENetCallbacks
internal struct ENetCallbacks
{
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate IntPtr MallocCb(IntPtr size);
......@@ -50,8 +50,9 @@ namespace UNet
private IntPtr no_memory;
}
// ENetEvent
[StructLayout(LayoutKind.Sequential)]
public class ENetEvent
internal class ENetEvent
{
public EventType Type;
public IntPtr Peer;
......@@ -61,22 +62,22 @@ namespace UNet
}
[StructLayout(LayoutKind.Sequential)]
public struct ENetHost
internal struct ENetHost
{
}
[StructLayout(LayoutKind.Sequential)]
public class ENetListNode
internal class ENetListNode
{
public IntPtr Next;
public IntPtr Previous;
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ENetPacketFreeCallback(ref ENetPacket param0);
internal delegate void ENetPacketFreeCallback(ref ENetPacket param0);
[StructLayout(LayoutKind.Sequential)]
public struct ENetPacket
internal struct ENetPacket
{
public uint ReferenceCount;
public uint Flags;
......@@ -87,7 +88,7 @@ namespace UNet
}
[StructLayout(LayoutKind.Sequential)]
public struct ENetPeer
internal struct ENetPeer
{
public ENetListNode DispatchList;
public IntPtr Host;
......
......@@ -3,7 +3,7 @@ using System.Net;
namespace UNet
{
public struct UAddress
internal struct UAddress
{
private uint ip;
private ushort port;
......
using System;
namespace UNet
{
public enum EventState
{
CONNECTED = 0,
DISCONNECTED = 1,
}
public class UEvent
{
private readonly ENetEvent ev;
private EventState peerState = EventState.CONNECTED;
public UEvent(ENetEvent ev)
{
this.ev = ev;
}
public EventState EventState
{
get
{
return this.peerState;
}
set
{
this.peerState = value;
}
}
public ENetEvent Ev
{
get
{
return this.ev;
}
}
public IntPtr PacketPtr
{
get
{
return this.Ev.Packet;
}
}
public IntPtr PeerPtr
{
get
{
return this.Ev.Peer;
}
}
public EventType Type
{
get
{
return this.Ev.Type;
}
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ using System.Runtime.Serialization;
namespace UNet
{
[Serializable]
public class UException: Exception
internal class UException : Exception
{
public UException()
{
......
......@@ -40,8 +40,6 @@
<ItemGroup>
<Compile Include="UAddress.cs" />
<Compile Include="UException.cs" />
<Compile Include="UEvent.cs" />
<Compile Include="ErrorCode.cs" />
<Compile Include="UPoller.cs" />
<Compile Include="Library.cs" />
<Compile Include="NativeMethods.cs" />
......@@ -56,6 +54,10 @@
<Folder Include="Properties\" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Common\Common.csproj">
<Project>{19F8F043-1F99-4550-99DF-DEA5C7D77E55}</Project>
<Name>Common</Name>
</ProjectReference>
<ProjectReference Include="..\Network\Network.csproj">
<Project>{3bd499ff-3c34-4920-8b21-c55fba580843}</Project>
<Name>Network</Name>
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Common.Base;
namespace UNet
{
public sealed class UPoller: IDisposable
internal sealed class UPoller: IDisposable
{
static UPoller()
{
......@@ -12,27 +14,11 @@ namespace UNet
}
private readonly USocketManager uSocketManager = new USocketManager();
private readonly LinkedList<UEvent> connEEvents = new LinkedList<UEvent>();
internal USocketManager USocketManager
{
get
{
return this.uSocketManager;
}
}
internal LinkedList<UEvent> ConnEEvents
{
get
{
return this.connEEvents;
}
}
private readonly QueueDictionary<IntPtr, ENetEvent> connQueue = new QueueDictionary<IntPtr, ENetEvent>();
private IntPtr host;
private readonly object eventsLock = new object();
private Action events;
private readonly USocket acceptor = new USocket(IntPtr.Zero);
private readonly BlockingCollection<Action> blockingCollection = new BlockingCollection<Action>();
......@@ -47,6 +33,8 @@ namespace UNet
{
throw new UException("Host creation call failed.");
}
NativeMethods.EnetHostCompressWithRangeCoder(this.host);
}
public UPoller()
......@@ -83,34 +71,77 @@ namespace UNet
this.host = IntPtr.Zero;
}
public IntPtr HostPtr
public Task<USocket> AcceptAsync()
{
get
if (this.uSocketManager.ContainsKey(IntPtr.Zero))
{
return this.host;
throw new UException("do not accept twice!");
}
}
private UEvent GetEvent()
{
ENetEvent eEvent = new ENetEvent();
int ret = NativeMethods.EnetHostCheckEvents(this.host, eEvent);
if (ret <= 0)
var tcs = new TaskCompletionSource<USocket>();
// 如果有请求连接缓存的包,从缓存中取
if (this.connQueue.Count > 0)
{
return null;
IntPtr ptr = this.connQueue.FirstKey;
this.connQueue.Remove(ptr);
USocket socket = new USocket(ptr);
this.uSocketManager.Add(ptr, socket);
tcs.TrySetResult(socket);
}
else
{
this.uSocketManager.Add(acceptor.PeerPtr, acceptor);
acceptor.Connected = eEvent =>
{
if (eEvent.Type == EventType.Disconnect)
{
tcs.TrySetException(new UException("socket disconnected in accpet"));
}
this.uSocketManager.Remove(IntPtr.Zero);
USocket socket = new USocket(eEvent.Peer);
this.uSocketManager.Add(socket.PeerPtr, socket);
tcs.TrySetResult(socket);
};
}
UEvent u = new UEvent(eEvent);
return u;
return tcs.Task;
}
public void CompressWithRangeCoder()
public Task<USocket> ConnectAsync(string hostName, ushort port)
{
NativeMethods.EnetHostCompressWithRangeCoder(this.host);
var tcs = new TaskCompletionSource<USocket>();
UAddress address = new UAddress { Host = hostName, Port = port };
ENetAddress nativeAddress = address.Struct;
IntPtr ptr = NativeMethods.EnetHostConnect(
this.host, ref nativeAddress, NativeMethods.ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT, 0);
USocket socket = new USocket(ptr);
if (socket.PeerPtr == IntPtr.Zero)
{
throw new UException("host connect call failed.");
}
this.uSocketManager.Add(socket.PeerPtr, socket);
socket.Connected = eEvent =>
{
if (eEvent.Type == EventType.Disconnect)
{
tcs.TrySetException(new UException("socket disconnected in connect"));
}
tcs.TrySetResult(socket);
};
return tcs.Task;
}
public void DoNotCompress()
private ENetEvent GetEvent()
{
NativeMethods.EnetHostCompress(this.host, IntPtr.Zero);
ENetEvent eNetEvent = new ENetEvent();
if (NativeMethods.EnetHostCheckEvents(this.host, eNetEvent) <= 0)
{
return null;
}
return eNetEvent;
}
public void Flush()
......@@ -118,60 +149,38 @@ namespace UNet
NativeMethods.EnetHostFlush(this.host);
}
public void SetBandwidthLimit(uint incomingBandwidth, uint outgoingBandwidth)
public void Add(Action action)
{
NativeMethods.EnetHostBandwidthLimit(this.host, incomingBandwidth, outgoingBandwidth);
blockingCollection.Add(action);
}
public void SetChannelLimit(uint channelLimit)
private void OnEvents(int timeout)
{
if (channelLimit > NativeMethods.ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT)
// 处理读写线程的回调
Action action;
if (!this.blockingCollection.TryTake(out action, timeout))
{
throw new ArgumentOutOfRangeException(string.Format("channelLimit: {0}", channelLimit));
return;
}
NativeMethods.EnetHostChannelLimit(this.host, channelLimit);
}
public event Action Events
{
add
{
lock (this.eventsLock)
{
this.events += value;
}
}
remove
var queue = new Queue<Action>();
queue.Enqueue(action);
while (this.blockingCollection.TryTake(out action, 0))
{
lock (this.eventsLock)
{
this.events -= value;
}
queue.Enqueue(action);
}
}
public void OnEvents()
{
Action local = null;
lock (this.eventsLock)
while (queue.Count > 0)
{
if (this.events == null)
{
return;
}
local = this.events;
this.events = null;
Action a = queue.Dequeue();
a();
}
local();
}
private int Service(int timeout)
private int Service()
{
if (timeout < 0)
{
throw new ArgumentOutOfRangeException(string.Format("timeout: {0}", timeout));
}
return NativeMethods.EnetHostService(this.host, null, (uint) timeout);
return NativeMethods.EnetHostService(this.host, null, 0);
}
public void RunOnce(int timeout = 0)
......@@ -181,86 +190,79 @@ namespace UNet
throw new ArgumentOutOfRangeException(string.Format("timeout: {0}", timeout));
}
this.OnEvents();
this.OnEvents(timeout);
if (this.Service(timeout) < 0)
if (this.Service() < 0)
{
return;
}
while (true)
{
UEvent uEvent = this.GetEvent();
if (uEvent == null)
ENetEvent eNetEvent = this.GetEvent();
if (eNetEvent == null)
{
return;
}
switch (uEvent.Type)
switch (eNetEvent.Type)
{
case EventType.Connect:
{
// 这是一个connect peer
if (this.USocketManager.ContainsKey(uEvent.PeerPtr))
if (this.uSocketManager.ContainsKey(eNetEvent.Peer))
{
USocket uSocket = this.USocketManager[uEvent.PeerPtr];
uSocket.OnConnected(uEvent);
USocket uSocket = this.uSocketManager[eNetEvent.Peer];
uSocket.OnConnected(eNetEvent);
break;
}
// accept peer
else
// 这是accept peer
if (this.uSocketManager.ContainsKey(IntPtr.Zero))
{
// 如果server端没有acceptasync,则请求放入队列
if (!this.USocketManager.ContainsKey(IntPtr.Zero))
{
this.connEEvents.AddLast(uEvent);
}
else
{
USocket uSocket = this.USocketManager[IntPtr.Zero];
uSocket.OnConnected(uEvent);
}
USocket uSocket = this.uSocketManager[IntPtr.Zero];
uSocket.OnConnected(eNetEvent);
break;
}
// 如果server端没有acceptasync,则请求放入队列
this.connQueue.Add(eNetEvent.Peer, eNetEvent);
break;
}
case EventType.Receive:
{
USocket uSocket = this.USocketManager[uEvent.PeerPtr];
uSocket.OnReceived(uEvent);
USocket uSocket = this.uSocketManager[eNetEvent.Peer];
uSocket.OnReceived(eNetEvent);
break;
}
case EventType.Disconnect:
{
// 如果链接还在缓存中,则删除
foreach (UEvent connEEvent in this.connEEvents)
if (this.connQueue.Remove(eNetEvent.Peer))
{
if (connEEvent.PeerPtr != uEvent.PeerPtr)
{
continue;
}
this.connEEvents.Remove(connEEvent);
return;
break;
}
// 链接已经被应用层接收
uEvent.EventState = EventState.DISCONNECTED;
USocket uSocket = this.USocketManager[uEvent.PeerPtr];
this.USocketManager.Remove(uEvent.PeerPtr);
USocket uSocket = this.uSocketManager[eNetEvent.Peer];
this.uSocketManager.Remove(eNetEvent.Peer);
// 等待的task将抛出异常
if (uSocket.Connected != null)
{
uSocket.OnConnected(uEvent);
uSocket.OnConnected(eNetEvent);
break;
}
else if (uSocket.Received != null)
if (uSocket.Received != null)
{
uSocket.OnReceived(uEvent);
uSocket.OnReceived(eNetEvent);
break;
}
else if (uSocket.Disconnect != null)
if (uSocket.Disconnect != null)
{
uSocket.OnDisconnect(uEvent);
uSocket.OnDisconnect(eNetEvent);
break;
}
uSocket.OnError(ErrorCode.ClientDisconnect);
break;
}
}
......
......@@ -56,21 +56,12 @@ namespace UNet
public void Add(Action action)
{
this.poller.Events += action;
}
public UPoller Poller
{
get
{
return this.poller;
}
this.poller.Add(action);
}
private async Task<IChannel> ConnectAsync(string host, int port)
{
USocket newSocket = new USocket(this.poller);
await newSocket.ConnectAsync(host, (ushort)port);
USocket newSocket = await this.poller.ConnectAsync(host, (ushort)port);
UChannel channel = new UChannel(newSocket, this);
channels[channel.RemoteAddress] = channel;
return channel;
......@@ -95,8 +86,7 @@ namespace UNet
public async Task<IChannel> GetChannel()
{
USocket socket = new USocket(this.poller);
await socket.AcceptAsync();
USocket socket = await this.poller.AcceptAsync();
UChannel channel = new UChannel(socket, this);
channels[channel.RemoteAddress] = channel;
return channel;
......
......@@ -6,22 +6,16 @@ using Network;
namespace UNet
{
public sealed class USocket: IDisposable
internal sealed class USocket : IDisposable
{
private IntPtr peerPtr = IntPtr.Zero;
private readonly UPoller service;
private IntPtr peerPtr;
private readonly LinkedList<byte[]> recvBuffer = new LinkedList<byte[]>();
public Action<UEvent> Connected { get; private set; }
public Action<UEvent> Received { get; private set; }
public Action<UEvent> Disconnect { get; private set; }
public Action<ENetEvent> Connected { get; set; }
public Action<ENetEvent> Received { get; private set; }
public Action<ENetEvent> Disconnect { get; private set; }
public Action<int> Error { get; set; }
public USocket(UPoller service)
{
this.service = service;
}
private void Dispose(bool disposing)
{
if (this.peerPtr == IntPtr.Zero)
......@@ -33,6 +27,11 @@ namespace UNet
this.peerPtr = IntPtr.Zero;
}
public USocket(IntPtr peerPtr)
{
this.peerPtr = peerPtr;
}
~USocket()
{
Dispose(false);
......@@ -50,10 +49,6 @@ namespace UNet
{
return this.peerPtr;
}
set
{
this.peerPtr = value;
}
}
private ENetPeer Struct
......@@ -92,68 +87,6 @@ namespace UNet
NativeMethods.EnetPeerThrottleConfigure(this.peerPtr, interval, acceleration, deceleration);
}
public Task<bool> ConnectAsync(string hostName, ushort port)
{
var tcs = new TaskCompletionSource<bool>();
UAddress address = new UAddress { Host = hostName, Port = port };
ENetAddress nativeAddress = address.Struct;
this.peerPtr = NativeMethods.EnetHostConnect(
this.service.HostPtr, ref nativeAddress, NativeMethods.ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT, 0);
if (this.peerPtr == IntPtr.Zero)
{
throw new UException("host connect call failed.");
}
this.service.USocketManager.Add(this.peerPtr, this);
this.Connected = eEvent =>
{
if (eEvent.EventState == EventState.DISCONNECTED)
{
tcs.TrySetException(new UException("socket disconnected in connect"));
}
tcs.TrySetResult(true);
};
return tcs.Task;
}
public Task<bool> AcceptAsync()
{
if (this.service.USocketManager.ContainsKey(IntPtr.Zero))
{
throw new UException("do not accept twice!");
}
var tcs = new TaskCompletionSource<bool>();
// 如果有请求连接缓存的包,从缓存中取
if (this.service.ConnEEvents.Count > 0)
{
UEvent uEvent = this.service.ConnEEvents.First.Value;
this.service.ConnEEvents.RemoveFirst();
this.PeerPtr = uEvent.PeerPtr;
this.service.USocketManager.Add(this.PeerPtr, this);
tcs.TrySetResult(true);
}
else
{
this.service.USocketManager.Add(this.PeerPtr, this);
this.Connected = eEvent =>
{
if (eEvent.EventState == EventState.DISCONNECTED)
{
tcs.TrySetException(new UException("socket disconnected in accpet"));
}
this.service.USocketManager.Remove(IntPtr.Zero);
this.PeerPtr = eEvent.PeerPtr;
this.service.USocketManager.Add(this.PeerPtr, this);
tcs.TrySetResult(true);
};
}
return tcs.Task;
}
public void SendAsync(byte[] data, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
{
UPacket packet = new UPacket(data, flags);
......@@ -178,12 +111,12 @@ namespace UNet
{
this.Received = eEvent =>
{
if (eEvent.EventState == EventState.DISCONNECTED)
if (eEvent.Type == EventType.Disconnect)
{
tcs.TrySetException(new UException("socket disconnected in receive"));
}
using (UPacket packet = new UPacket(eEvent.PacketPtr))
using (UPacket packet = new UPacket(eEvent.Packet))
{
byte[] bytes = packet.Bytes;
tcs.TrySetResult(bytes);
......@@ -197,7 +130,7 @@ namespace UNet
{
NativeMethods.EnetPeerDisconnect(this.peerPtr, data);
// EnetPeerDisconnect会reset Peer,这里设置为0,防止再次Dispose
this.PeerPtr = IntPtr.Zero;
this.peerPtr = IntPtr.Zero;
var tcs = new TaskCompletionSource<bool>();
this.Disconnect = eEvent => tcs.TrySetResult(true);
return tcs.Task;
......@@ -207,7 +140,7 @@ namespace UNet
{
NativeMethods.EnetPeerDisconnectLater(this.peerPtr, data);
// EnetPeerDisconnect会reset Peer,这里设置为0,防止再次Dispose
this.PeerPtr = IntPtr.Zero;
this.peerPtr = IntPtr.Zero;
var tcs = new TaskCompletionSource<bool>();
this.Disconnect = eEvent => tcs.TrySetResult(true);
return tcs.Task;
......@@ -217,27 +150,27 @@ namespace UNet
{
NativeMethods.EnetPeerDisconnectNow(this.peerPtr, data);
// EnetPeerDisconnect会reset Peer,这里设置为0,防止再次Dispose
this.PeerPtr = IntPtr.Zero;
this.peerPtr = IntPtr.Zero;
}
internal void OnConnected(UEvent uEvent)
internal void OnConnected(ENetEvent eNetEvent)
{
if (this.Connected == null)
{
return;
}
Action<UEvent> localConnected = this.Connected;
Action<ENetEvent> localConnected = this.Connected;
this.Connected = null;
// 此调用将让await ConnectAsync返回,所以null必须在此之前设置
localConnected(uEvent);
localConnected(eNetEvent);
}
internal void OnReceived(UEvent uEvent)
internal void OnReceived(ENetEvent eNetEvent)
{
// 如果应用层还未调用readasync则将包放到缓存队列
if (this.Received == null)
{
using (UPacket packet = new UPacket(uEvent.PacketPtr))
using (UPacket packet = new UPacket(eNetEvent.Packet))
{
byte[] bytes = packet.Bytes;
this.recvBuffer.AddLast(bytes);
......@@ -245,29 +178,23 @@ namespace UNet
}
else
{
Action<UEvent> localReceived = this.Received;
Action<ENetEvent> localReceived = this.Received;
this.Received = null;
// 此调用将让await ReadAsync返回,所以null必须在此之前设置
localReceived(uEvent);
localReceived(eNetEvent);
}
}
internal void OnDisconnect(UEvent uEvent)
internal void OnDisconnect(ENetEvent eNetEvent)
{
if (this.Disconnect == null)
{
return;
}
this.Disconnect(uEvent);
}
internal void OnError(int errorCode)
{
if (this.Error == null)
{
return;
}
this.Error(errorCode);
Action<ENetEvent> localDisconnect = this.Disconnect;
this.Disconnect = null;
localDisconnect(eNetEvent);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册