提交 dfc4c483 编写于 作者: T tanghai

增加服务端代码,网络库既可以做服务端可以做客户端连接

上级 870f80fd
......@@ -32,3 +32,4 @@ _ReSharper.CSharp/
/Unity/Assets/Res/Code/Controller.dll.mdb.bytes
/Unity/Assets/Res/Code/Controller.dll.mdb.bytes.meta
/Unity/CSharp60Support/compilation log.txt
/CSharp/CSharp.VC.opendb
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace TNet
{
public class TSocket
{
private readonly IPoller poller;
private Socket socket;
private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
public TSocket(IPoller poller)
{
this.poller = poller;
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
}
public IPoller Poller
{
get
{
return this.poller;
}
}
public string RemoteAddress
{
get
{
IPEndPoint ipEndPoint = (IPEndPoint) this.socket.RemoteEndPoint;
return ipEndPoint.Address + ":" + ipEndPoint.Port;
}
}
public Socket Socket
{
get
{
return this.socket;
}
}
protected virtual void Dispose(bool disposing)
{
if (this.socket == null)
{
return;
}
if (disposing)
{
this.socket.Dispose();
}
this.socket = null;
}
~TSocket()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void Bind(string host, int port)
{
this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
}
public void Listen(int backlog)
{
this.socket.Listen(backlog);
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
Action action;
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
action = () => OnAcceptComplete(e);
break;
case SocketAsyncOperation.Connect:
action = () => OnConnectComplete(e);
break;
case SocketAsyncOperation.Disconnect:
action = () => OnDisconnectComplete(e);
break;
case SocketAsyncOperation.Receive:
action = () => OnRecvComplete(e);
break;
case SocketAsyncOperation.Send:
action = () => OnSendComplete(e);
break;
default:
throw new ArgumentOutOfRangeException();
}
this.poller.Add(action);
}
public Task<bool> ConnectAsync(string host, int port)
{
var tcs = new TaskCompletionSource<bool>();
this.outArgs.UserToken = tcs;
this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
if (!this.socket.ConnectAsync(this.outArgs))
{
OnConnectComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnConnectComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
public Task<bool> AcceptAsync(TSocket accpetSocket)
{
var tcs = new TaskCompletionSource<bool>();
this.innArgs.UserToken = tcs;
this.innArgs.AcceptSocket = accpetSocket.socket;
if (!this.socket.AcceptAsync(this.innArgs))
{
OnAcceptComplete(this.innArgs);
}
return tcs.Task;
}
private static void OnAcceptComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
public Task<int> RecvAsync(byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
this.innArgs.UserToken = tcs;
this.innArgs.SetBuffer(buffer, offset, count);
if (!this.socket.ReceiveAsync(this.innArgs))
{
OnRecvComplete(this.innArgs);
}
return tcs.Task;
}
private static void OnRecvComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<int>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(e.BytesTransferred);
}
public Task<int> SendAsync(byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
this.outArgs.UserToken = tcs;
this.outArgs.SetBuffer(buffer, offset, count);
if (!this.socket.SendAsync(this.outArgs))
{
OnSendComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnSendComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<int>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(e.BytesTransferred);
}
public Task<bool> DisconnectAsync()
{
var tcs = new TaskCompletionSource<bool>();
this.outArgs.UserToken = tcs;
if (!this.socket.DisconnectAsync(this.outArgs))
{
OnDisconnectComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnDisconnectComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
}
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace TNet
{
public class TSocket
{
private readonly IPoller poller;
private Socket socket;
private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
public TSocket(IPoller poller)
{
this.poller = poller;
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
}
public IPoller Poller
{
get
{
return this.poller;
}
}
public string RemoteAddress
{
get
{
IPEndPoint ipEndPoint = (IPEndPoint) this.socket.RemoteEndPoint;
return ipEndPoint.Address + ":" + ipEndPoint.Port;
}
}
public Socket Socket
{
get
{
return this.socket;
}
}
protected virtual void Dispose(bool disposing)
{
if (this.socket == null)
{
return;
}
if (disposing)
{
this.socket.Dispose();
}
this.socket = null;
}
~TSocket()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void Bind(string host, int port)
{
this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
}
public void Listen(int backlog)
{
this.socket.Listen(backlog);
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
Action action;
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
action = () => OnAcceptComplete(e);
break;
case SocketAsyncOperation.Connect:
action = () => OnConnectComplete(e);
break;
case SocketAsyncOperation.Disconnect:
action = () => OnDisconnectComplete(e);
break;
case SocketAsyncOperation.Receive:
action = () => OnRecvComplete(e);
break;
case SocketAsyncOperation.Send:
action = () => OnSendComplete(e);
break;
default:
throw new ArgumentOutOfRangeException();
}
this.poller.Add(action);
}
public Task<bool> ConnectAsync(string host, int port)
{
var tcs = new TaskCompletionSource<bool>();
this.outArgs.UserToken = tcs;
this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
if (!this.socket.ConnectAsync(this.outArgs))
{
OnConnectComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnConnectComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
public Task<bool> AcceptAsync(TSocket accpetSocket)
{
var tcs = new TaskCompletionSource<bool>();
this.innArgs.UserToken = tcs;
this.innArgs.AcceptSocket = accpetSocket.socket;
if (!this.socket.AcceptAsync(this.innArgs))
{
OnAcceptComplete(this.innArgs);
}
return tcs.Task;
}
private static void OnAcceptComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
public Task<int> RecvAsync(byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
this.innArgs.UserToken = tcs;
this.innArgs.SetBuffer(buffer, offset, count);
if (!this.socket.ReceiveAsync(this.innArgs))
{
OnRecvComplete(this.innArgs);
}
return tcs.Task;
}
private static void OnRecvComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<int>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(e.BytesTransferred);
}
public Task<int> SendAsync(byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
this.outArgs.UserToken = tcs;
this.outArgs.SetBuffer(buffer, offset, count);
if (!this.socket.SendAsync(this.outArgs))
{
OnSendComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnSendComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<int>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(e.BytesTransferred);
}
public Task<bool> DisconnectAsync()
{
var tcs = new TaskCompletionSource<bool>();
this.outArgs.UserToken = tcs;
if (!this.socket.DisconnectAsync(this.outArgs))
{
OnDisconnectComplete(this.outArgs);
}
return tcs.Task;
}
private static void OnDisconnectComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>) e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Base
{
......@@ -22,14 +25,16 @@ namespace Base
public abstract AChannel GetChannel(string host, int port);
public abstract AChannel GetChannel(string address);
public abstract Task<AChannel> GetChannel();
public abstract void Remove(long channelId);
public abstract void Update();
public Action<long, SocketError> OnError;
public void OnChannelError(long channelId, SocketError error)
protected void OnChannelError(long channelId, SocketError error)
{
this.OnError?.Invoke(channelId, error);
this.Remove(channelId);
......
......@@ -14,16 +14,24 @@ namespace Base
private bool isSending;
private readonly PacketParser parser;
private readonly string remoteAddress;
private bool isConnected;
public Action<long, SocketError> OnError;
public string RemoteAddress { get; }
public TChannel(TSocket socket, string host, int port, TService service) : base(service)
{
this.socket = socket;
this.parser = new PacketParser(this.recvBuffer);
this.remoteAddress = host + ":" + port;
this.RemoteAddress = host + ":" + port;
}
public TChannel(TSocket socket, TService service) : base(service)
{
this.socket = socket;
this.parser = new PacketParser(this.recvBuffer);
this.RemoteAddress = socket.RemoteAddress;
}
public override void Dispose()
......@@ -43,7 +51,7 @@ namespace Base
public override void ConnectAsync()
{
string[] ss = this.remoteAddress.Split(':');
string[] ss = this.RemoteAddress.Split(':');
int port = int.Parse(ss[1]);
bool result = this.socket.ConnectAsync(ss[0], port);
if (!result)
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Base
{
public sealed class TService: AService
{
private TPoller poller = new TPoller();
private readonly TSocket acceptor;
private readonly Dictionary<long, TChannel> idChannels = new Dictionary<long, TChannel>();
private void Dispose(bool disposing)
private readonly Dictionary<string, TChannel> addressChannels = new Dictionary<string, TChannel>();
/// <summary>
/// 即可做client也可做server
/// </summary>
/// <param name="host"></param>
/// <param name="port"></param>
public TService(string host, int port)
{
this.acceptor = new TSocket(this.poller, host, port);
}
public TService()
{
}
public override void Dispose()
{
if (this.poller == null)
{
return;
}
if (disposing)
foreach (long id in this.idChannels.Keys.ToArray())
{
foreach (long id in this.idChannels.Keys.ToArray())
{
TChannel channel = this.idChannels[id];
channel.Dispose();
}
TChannel channel = this.idChannels[id];
channel.Dispose();
}
this.poller = null;
}
public override void Dispose()
{
this.Dispose(true);
}
public override void Add(Action action)
{
this.poller.Add(action);
......@@ -46,6 +55,20 @@ namespace Base
return channel;
}
public override async Task<AChannel> GetChannel()
{
if (this.acceptor == null)
{
throw new Exception("service construct must use host and port param");
}
TSocket socket = new TSocket(this.poller);
await this.acceptor.AcceptAsync(socket);
TChannel channel = new TChannel(socket, this);
this.addressChannels[channel.RemoteAddress] = channel;
this.idChannels[channel.Id] = channel;
return channel;
}
public override void Remove(long id)
{
TChannel channel;
......@@ -63,19 +86,26 @@ namespace Base
public override AChannel GetChannel(string host, int port)
{
TChannel channel = null;
TSocket newSocket = new TSocket(this.poller);
channel = new TChannel(newSocket, host, port, this);
channel.OnError += this.OnChannelError;
this.idChannels[channel.Id] = channel;
return channel;
string address = $"{host}:{port}";
return this.GetChannel(address);
}
public override AChannel GetChannel(string address)
{
TChannel channel = null;
if (this.addressChannels.TryGetValue(address, out channel))
{
return channel;
}
string[] ss = address.Split(':');
string host = ss[0];
int port = int.Parse(ss[1]);
return this.GetChannel(ss[0], port);
TSocket newSocket = new TSocket(this.poller);
channel = new TChannel(newSocket, host, port, this);
channel.OnError += this.OnChannelError;
this.idChannels[channel.Id] = channel;
return channel;
}
public override void Update()
......
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Base
{
......@@ -18,7 +19,6 @@ namespace Base
public Action<int, SocketError> OnRecv;
public Action<int, SocketError> OnSend;
public Action<SocketError> OnDisconnect;
private string remoteAddress;
public TSocket(TPoller poller)
{
......@@ -28,14 +28,14 @@ namespace Base
this.outArgs.Completed += this.OnComplete;
}
public string RemoteAddress
public TSocket(TPoller poller, string host, int port): this(poller)
{
get
{
return remoteAddress;
}
this.Bind(host, port);
this.Listen(100);
}
public string RemoteAddress { get; private set; }
public Socket Socket
{
get
......@@ -44,30 +44,50 @@ namespace Base
}
}
protected void Dispose(bool disposing)
public void Dispose()
{
if (this.socket == null)
{
return;
}
if (disposing)
{
this.socket.Close();
}
this.socket.Close();
this.socket = null;
}
~TSocket()
private void Bind(string host, int port)
{
this.Dispose(false);
this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
}
public void Dispose()
private void Listen(int backlog)
{
this.Dispose(true);
GC.SuppressFinalize(this);
this.socket.Listen(backlog);
}
public Task<bool> AcceptAsync(TSocket accpetSocket)
{
var tcs = new TaskCompletionSource<bool>();
this.innArgs.UserToken = tcs;
this.innArgs.AcceptSocket = accpetSocket.socket;
if (!this.socket.AcceptAsync(this.innArgs))
{
OnAcceptComplete(this.innArgs);
}
return tcs.Task;
}
private static void OnAcceptComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>)e.UserToken;
e.UserToken = null;
if (e.SocketError != SocketError.Success)
{
tcs.SetException(new Exception($"socket error: {e.SocketError}"));
return;
}
tcs.SetResult(true);
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
......@@ -87,6 +107,9 @@ namespace Base
case SocketAsyncOperation.Disconnect:
action = () => OnDisconnectComplete(e);
break;
case SocketAsyncOperation.Accept:
action = () => OnAcceptComplete(e);
break;
default:
throw new Exception($"socket error: {e.LastOperation}");
}
......@@ -97,7 +120,7 @@ namespace Base
public bool ConnectAsync(string host, int port)
{
remoteAddress = $"{host}:{port}";
this.RemoteAddress = $"{host}:{port}";
this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
if (this.socket.ConnectAsync(this.outArgs))
{
......
......@@ -6,7 +6,7 @@ namespace Base
{
internal class UChannel: AChannel
{
private USocket socket;
private readonly USocket socket;
private readonly string remoteAddress;
public UChannel(USocket socket, string host, int port, UService service): base(service)
......@@ -16,6 +16,13 @@ namespace Base
this.remoteAddress = host + ":" + port;
}
public UChannel(USocket socket, UService service) : base(service)
{
this.socket = socket;
this.service = service;
this.remoteAddress = socket.RemoteAddress;
}
public override void Dispose()
{
if (this.Id == 0)
......@@ -28,6 +35,14 @@ namespace Base
this.socket.Dispose();
}
public string RemoteAddress
{
get
{
return this.remoteAddress;
}
}
public override void ConnectAsync()
{
string[] ss = this.remoteAddress.Split(':');
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Base
{
......@@ -10,8 +11,14 @@ namespace Base
Library.Initialize();
}
private readonly USocketManager uSocketManager = new USocketManager();
private readonly QueueDictionary<IntPtr, ENetEvent> connQueue = new QueueDictionary<IntPtr, ENetEvent>();
private IntPtr host;
private readonly USocket acceptor;
// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
private Queue<Action> concurrentQueue = new Queue<Action>();
......@@ -21,28 +28,34 @@ namespace Base
private readonly object lockObject = new object();
public UPoller()
public UPoller(string hostName, ushort port)
{
this.USocketManager = new USocketManager();
this.host = NativeMethods.ENetHostCreate(IntPtr.Zero, NativeMethods.ENET_PROTOCOL_MAXIMUM_PEER_ID, 0, 0, 0);
this.acceptor = new USocket(IntPtr.Zero, this);
UAddress address = new UAddress(hostName, port);
ENetAddress nativeAddress = address.Struct;
this.host = NativeMethods.ENetHostCreate(ref nativeAddress,
NativeMethods.ENET_PROTOCOL_MAXIMUM_PEER_ID, 0, 0, 0);
if (this.host == IntPtr.Zero)
{
throw new Exception("Host creation call failed.");
}
}
~UPoller()
{
this.Dispose(false);
NativeMethods.ENetHostCompressWithRangeCoder(this.host);
}
public void Dispose()
public UPoller()
{
this.Dispose(true);
this.uSocketManager = new USocketManager();
this.host = NativeMethods.ENetHostCreate(IntPtr.Zero, NativeMethods.ENET_PROTOCOL_MAXIMUM_PEER_ID, 0, 0, 0);
if (this.host == IntPtr.Zero)
{
throw new Exception("Host creation call failed.");
}
}
private void Dispose(bool disposing)
public void Dispose()
{
if (this.host == IntPtr.Zero)
{
......@@ -54,7 +67,13 @@ namespace Base
this.host = IntPtr.Zero;
}
public USocketManager USocketManager { get; }
public USocketManager USocketManager
{
get
{
return this.uSocketManager;
}
}
public IntPtr Host
{
......@@ -64,7 +83,7 @@ namespace Base
}
}
private ENetEvent GetEvent()
private ENetEvent TryGetEvent()
{
if (this.eNetEventCache == null)
{
......@@ -86,7 +105,37 @@ namespace Base
public void Add(Action action)
{
this.concurrentQueue.Enqueue(action);
lock (lockObject)
{
this.concurrentQueue.Enqueue(action);
}
}
public Task<USocket> AcceptAsync()
{
if (this.uSocketManager.ContainsKey(IntPtr.Zero))
{
throw new Exception("do not accept twice!");
}
var tcs = new TaskCompletionSource<USocket>();
// 如果有请求连接缓存的包,从缓存中取
if (this.connQueue.Count > 0)
{
IntPtr ptr = this.connQueue.FirstKey;
this.connQueue.Remove(ptr);
USocket socket = new USocket(ptr, this);
this.uSocketManager.Add(ptr, socket);
tcs.TrySetResult(socket);
}
else
{
this.uSocketManager.Add(this.acceptor.PeerPtr, this.acceptor);
this.acceptor.AcceptTcs = tcs;
}
return tcs.Task;
}
private void OnEvents()
......@@ -121,7 +170,7 @@ namespace Base
while (true)
{
ENetEvent eNetEvent = this.GetEvent();
ENetEvent eNetEvent = this.TryGetEvent();
if (eNetEvent == null)
{
return;
......@@ -132,11 +181,23 @@ namespace Base
case EventType.Connect:
{
// 这是一个connect peer
if (this.USocketManager.ContainsKey(eNetEvent.Peer))
if (this.uSocketManager.ContainsKey(eNetEvent.Peer))
{
USocket uSocket = this.USocketManager[eNetEvent.Peer];
uSocket.OnConnected(eNetEvent);
USocket uSocket = this.uSocketManager[eNetEvent.Peer];
uSocket.OnConnected();
break;
}
// 这是accept peer
if (this.uSocketManager.ContainsKey(IntPtr.Zero))
{
USocket uSocket = this.uSocketManager[IntPtr.Zero];
uSocket.OnAccepted(eNetEvent);
break;
}
// 如果server端没有acceptasync,则请求放入队列
this.connQueue.Add(eNetEvent.Peer, eNetEvent);
break;
}
case EventType.Receive:
......
......@@ -2,14 +2,26 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Base
{
public sealed class UService: AService
{
private UPoller poller;
private readonly Dictionary<long, UChannel> idChannels = new Dictionary<long, UChannel>();
private readonly Dictionary<string, UChannel> addressChannels = new Dictionary<string, UChannel>();
/// <summary>
/// 即可做client也可做server
/// </summary>
/// <param name="host"></param>
/// <param name="port"></param>
public UService(string host, int port)
{
this.poller = new UPoller(host, (ushort)port);
}
/// <summary>
/// 只能做client
......@@ -19,31 +31,22 @@ namespace Base
this.poller = new UPoller();
}
private void Dispose(bool disposing)
public override void Dispose()
{
if (this.poller == null)
{
return;
}
if (disposing)
foreach (long id in this.idChannels.Keys.ToArray())
{
foreach (long id in this.idChannels.Keys.ToArray())
{
UChannel channel = this.idChannels[id];
channel.Dispose();
}
this.poller.Dispose();
UChannel channel = this.idChannels[id];
channel.Dispose();
}
this.poller = null;
}
public override void Dispose()
{
this.Dispose(true);
}
public override void Add(Action action)
{
this.poller.Add(action);
......@@ -51,20 +54,33 @@ namespace Base
public override AChannel GetChannel(string host, int port)
{
UChannel channel = null;
return this.GetChannel($"{host}:{port}");
}
public override AChannel GetChannel(string address)
{
UChannel channel = null;
if (this.addressChannels.TryGetValue(address, out channel))
{
return channel;
}
USocket newSocket = new USocket(this.poller);
string[] ss = address.Split(':');
int port = int.Parse(ss[1]);
string host = ss[0];
channel = new UChannel(newSocket, host, port, this);
newSocket.Disconnect += () => this.OnChannelError(channel.Id, SocketError.SocketError);
this.idChannels[channel.Id] = channel;
return channel;
}
public override AChannel GetChannel(string address)
public override async Task<AChannel> GetChannel()
{
string[] ss = address.Split(':');
int port = int.Parse(ss[1]);
return this.GetChannel(ss[0], port);
USocket socket = await this.poller.AcceptAsync();
UChannel channel = new UChannel(socket, this);
this.addressChannels[channel.RemoteAddress] = channel;
this.idChannels[channel.Id] = channel;
return channel;
}
public override AChannel GetChannel(long id)
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Base
{
......@@ -16,8 +17,8 @@ namespace Base
private readonly Queue<byte[]> recvQueue = new Queue<byte[]>();
private readonly Queue<BufferInfo> sendQueue = new Queue<BufferInfo>();
private bool isConnected;
private string remoteAddress;
public Action Disconnect;
public TaskCompletionSource<USocket> AcceptTcs { private get; set; }
public USocket(IntPtr peerPtr, UPoller poller)
{
......@@ -30,7 +31,7 @@ namespace Base
this.poller = poller;
}
private void Dispose(bool disposing)
public void Dispose()
{
if (this.PeerPtr == IntPtr.Zero)
{
......@@ -41,26 +42,10 @@ namespace Base
NativeMethods.ENetPeerDisconnectNow(this.PeerPtr, 0);
this.PeerPtr = IntPtr.Zero;
}
~USocket()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
public IntPtr PeerPtr { get; set; }
public string RemoteAddress
{
get
{
return remoteAddress;
}
}
public string RemoteAddress { get; private set; }
public Queue<byte[]> RecvQueue
{
......@@ -72,7 +57,7 @@ namespace Base
public void ConnectAsync(string host, ushort port)
{
this.remoteAddress = host + ":" + port;
this.RemoteAddress = host + ":" + port;
UAddress address = new UAddress(host, port);
ENetAddress nativeAddress = address.Struct;
......@@ -97,7 +82,7 @@ namespace Base
packet.PacketPtr = IntPtr.Zero;
}
internal void OnConnected(ENetEvent eNetEvent)
internal void OnConnected()
{
isConnected = true;
while (this.sendQueue.Count > 0)
......@@ -107,6 +92,20 @@ namespace Base
}
}
internal void OnAccepted(ENetEvent eEvent)
{
isConnected = true;
if (eEvent.Type == EventType.Disconnect)
{
this.AcceptTcs.TrySetException(new Exception("socket disconnected in accpet"));
}
this.poller.USocketManager.Remove(IntPtr.Zero);
USocket socket = new USocket(eEvent.Peer, this.poller);
this.poller.USocketManager.Add(socket.PeerPtr, socket);
this.AcceptTcs.TrySetResult(socket);
}
internal void OnReceived(ENetEvent eNetEvent)
{
// 将包放到缓存队列
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册