提交 d7738643 编写于 作者: T tanghai

1.实现异步Socket,与系统自带的NetworkStream不同的是,TSocket回调会回到poll线程,NetworkStream回调会回到不确定的线程.

2.TSocket发送接收以及回调都在poll线程处理.游戏逻辑也到poll线程执行.
3.压力测试1s可以收发15000个封包,对于游戏服务器应该足够了,用linux C++也不过40000个包左右
上级 7f36a36e
......@@ -168,7 +168,7 @@ namespace ENet
var bytes = this.recvBuffer.First.Value;
this.recvBuffer.RemoveFirst();
tcs.TrySetResult(bytes);
}
}
// 没有缓存封包,设置回调等待
else
{
......@@ -207,7 +207,7 @@ namespace ENet
var bytes = this.recvBuffer.First.Value;
this.recvBuffer.RemoveFirst();
action(bytes, 0);
}
}
// 没有缓存封包,设置回调等待
else
{
......
using System;
namespace TNet
{
public interface IPoller
{
void Add(Action action);
void RunOnce(int timeout);
void Run();
}
}
......@@ -58,11 +58,11 @@ namespace TNet
{
throw new Exception(string.Format("bufferList size < n, bufferList: {0} n: {1}", this.Count, n));
}
int alreadyCopyCount = n;
int alreadyCopyCount = 0;
while (alreadyCopyCount < n)
{
if (ChunkSize - this.FirstIndex > n - alreadyCopyCount)
{
{
Array.Copy(this.bufferList.First.Value, this.FirstIndex, buffer, alreadyCopyCount,
n - alreadyCopyCount);
this.FirstIndex += n - alreadyCopyCount;
......
......@@ -42,6 +42,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="IPoller.cs" />
<Compile Include="TPoller.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TBuffer.cs" />
......
......@@ -4,7 +4,7 @@ using System.Collections.Generic;
namespace TNet
{
public class TPoller
public class TPoller : IPoller
{
// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
private readonly BlockingCollection<Action> blockingCollection = new BlockingCollection<Action>();
......@@ -50,7 +50,7 @@ namespace TNet
{
while (true)
{
this.RunOnce(10);
this.RunOnce(1);
}
}
}
......
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace TNet
{
public class TSocket: IDisposable
public class TSocket
{
private Socket socket;
private readonly TPoller poller;
private readonly SocketAsyncEventArgs innSocketAsyncEventArgs = new SocketAsyncEventArgs();
private readonly SocketAsyncEventArgs outSocketAsyncEventArgs = new SocketAsyncEventArgs();
private readonly TBuffer recvBuffer = new TBuffer();
private readonly TBuffer sendBuffer = new TBuffer();
public Action RecvAction { get; set; }
public Action<TSocket> AcceptAction { get; set; }
private IPoller poller;
private readonly Socket socket;
private readonly SocketAsyncEventArgs socketAsyncEventArgs = new SocketAsyncEventArgs();
public TSocket(TPoller poller)
public TSocket(IPoller poller)
{
this.poller = poller;
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.outSocketAsyncEventArgs.Completed += this.OnComplete;
this.innSocketAsyncEventArgs.Completed += this.OnComplete;
this.socketAsyncEventArgs.Completed += this.OnComplete;
}
public TSocket(TPoller poller, Socket socket)
public Socket Socket
{
this.poller = poller;
this.socket = socket;
this.outSocketAsyncEventArgs.Completed += this.OnComplete;
this.innSocketAsyncEventArgs.Completed += this.OnComplete;
}
public void Dispose()
{
if (this.socket == null)
get
{
return;
return this.socket;
}
socket.Dispose();
this.socket = null;
}
public void Connect(string host, int port)
public void Dispose()
{
if (socket.ConnectAsync(this.innSocketAsyncEventArgs))
if (this.poller == null)
{
return;
}
this.poller.Add(this.OnConnComplete);
}
public void Accept(int port)
{
this.socket.Bind(new IPEndPoint(IPAddress.Any, port));
this.socket.Listen(100);
this.BeginAccept();
this.socket.Dispose();
this.poller = null;
}
public bool Recv(byte[] buffer)
public void Bind(string host, int port)
{
if (buffer.Length > this.RecvSize)
{
return false;
}
this.recvBuffer.RecvFrom(buffer);
return true;
this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
}
public void Send(byte[] buffer)
public void Listen(int backlog)
{
bool needBeginSend = this.sendBuffer.Count == 0;
this.sendBuffer.SendTo(buffer);
if (needBeginSend)
{
this.BeginSend();
}
}
public int RecvSize
{
get
{
return this.recvBuffer.Count;
}
this.socket.Listen(backlog);
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
......@@ -92,157 +52,119 @@ namespace TNet
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
action = () => this.OnAcceptComplete(e.AcceptSocket);
action = () => OnAcceptComplete(e);
e.AcceptSocket = null;
break;
case SocketAsyncOperation.Connect:
action = this.OnConnComplete;
action = () => OnConnectComplete(e);
break;
case SocketAsyncOperation.Disconnect:
action = this.OnDisconnect;
action = () => OnDisconnectComplete(e);
break;
case SocketAsyncOperation.Receive:
action = () => this.OnRecvComplete(e.BytesTransferred);
action = () => OnRecvComplete(e);
break;
case SocketAsyncOperation.Send:
action = () => this.OnSendComplete(e.BytesTransferred);
action = () => OnSendComplete(e);
break;
default:
throw new ArgumentOutOfRangeException();
}
this.poller.Add(action);
}
private void OnDisconnect()
{
this.Dispose();
this.poller.Add(action);
}
private void OnAcceptComplete(Socket sock)
public Task<bool> ConnectAsync(string host, int port)
{
if (this.socket == null)
{
return;
}
TSocket newSocket = new TSocket(poller, sock);
if (this.AcceptAction != null)
var tcs = new TaskCompletionSource<bool>();
this.socketAsyncEventArgs.UserToken = tcs;
this.socketAsyncEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
if (!this.socket.ConnectAsync(this.socketAsyncEventArgs))
{
this.AcceptAction(newSocket);
this.poller.Add(() => { OnConnectComplete(this.socketAsyncEventArgs); });
}
this.BeginAccept();
return tcs.Task;
}
private void OnConnComplete()
private static void OnConnectComplete(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
this.BeginRecv();
var tcs = (TaskCompletionSource<bool>)e.UserToken;
tcs.SetResult(true);
}
private void OnRecvComplete(int bytesTransferred)
public Task<bool> AcceptAsync(TSocket accpetSocket)
{
if (this.socket == null)
var tcs = new TaskCompletionSource<bool>();
this.socketAsyncEventArgs.UserToken = tcs;
this.socketAsyncEventArgs.AcceptSocket = accpetSocket.socket;
if (!this.socket.AcceptAsync(this.socketAsyncEventArgs))
{
return;
}
this.recvBuffer.LastIndex += bytesTransferred;
if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
{
this.recvBuffer.LastIndex = 0;
this.recvBuffer.AddLast();
}
this.BeginRecv();
if (this.RecvAction != null)
{
this.RecvAction();
Action action = () => OnAcceptComplete(this.socketAsyncEventArgs);
this.poller.Add(action);
}
return tcs.Task;
}
private void OnSendComplete(int bytesTransferred)
private static void OnAcceptComplete(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
this.sendBuffer.FirstIndex += bytesTransferred;
if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
{
this.sendBuffer.FirstIndex = 0;
this.sendBuffer.RemoveFirst();
}
// 如果没有数据可以发送,则返回
if (this.sendBuffer.Count == 0)
{
return;
}
// 继续发送数据
this.BeginSend();
var tcs = (TaskCompletionSource<bool>)e.UserToken;
tcs.SetResult(true);
}
private void BeginAccept()
public Task<int> RecvAsync(byte[] buffer, int offset, int count)
{
if (this.socket == null)
var tcs = new TaskCompletionSource<int>();
this.socketAsyncEventArgs.UserToken = tcs;
this.socketAsyncEventArgs.SetBuffer(buffer, offset, count);
if (!this.socket.ReceiveAsync(this.socketAsyncEventArgs))
{
return;
Action action = () => OnRecvComplete(this.socketAsyncEventArgs);
this.poller.Add(action);
}
if (this.socket.AcceptAsync(this.innSocketAsyncEventArgs))
{
return;
}
Action action = () => this.OnAcceptComplete(this.innSocketAsyncEventArgs.AcceptSocket);
this.poller.Add(action);
return tcs.Task;
}
private void BeginRecv()
private static void OnRecvComplete(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
var tcs = (TaskCompletionSource<int>)e.UserToken;
tcs.SetResult(e.BytesTransferred);
}
this.innSocketAsyncEventArgs.SetBuffer(this.recvBuffer.Last, this.recvBuffer.LastIndex, TBuffer.ChunkSize - this.recvBuffer.LastIndex);
if (this.socket.ReceiveAsync(this.innSocketAsyncEventArgs))
public Task<int> SendAsync(byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
this.socketAsyncEventArgs.UserToken = tcs;
this.socketAsyncEventArgs.SetBuffer(buffer, offset, count);
if (!this.socket.SendAsync(this.socketAsyncEventArgs))
{
return;
Action action = () => OnSendComplete(this.socketAsyncEventArgs);
this.poller.Add(action);
}
return tcs.Task;
}
Action action = () => this.OnRecvComplete(this.innSocketAsyncEventArgs.BytesTransferred);
this.poller.Add(action);
private static void OnSendComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<int>)e.UserToken;
tcs.SetResult(e.BytesTransferred);
}
private void BeginSend()
public Task<bool> DisconnectAsync()
{
if (this.socket == null)
var tcs = new TaskCompletionSource<bool>();
this.socketAsyncEventArgs.UserToken = tcs;
if (!this.socket.DisconnectAsync(this.socketAsyncEventArgs))
{
return;
Action action = () => OnDisconnectComplete(this.socketAsyncEventArgs);
this.poller.Add(action);
}
return tcs.Task;
}
int count = 0;
if (TBuffer.ChunkSize - this.sendBuffer.FirstIndex < this.sendBuffer.Count)
{
count = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
}
else
{
count = this.sendBuffer.Count;
}
this.outSocketAsyncEventArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, count);
if (this.socket.SendAsync(outSocketAsyncEventArgs))
{
return;
}
Action action = () => this.OnSendComplete(this.outSocketAsyncEventArgs.BytesTransferred);
this.poller.Add(action);
private static void OnDisconnectComplete(SocketAsyncEventArgs e)
{
var tcs = (TaskCompletionSource<bool>)e.UserToken;
tcs.SetResult(true);
}
}
}
......@@ -52,6 +52,7 @@
</Choose>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TSocketTest.cs" />
<Compile Include="TcpListenerTest.cs" />
</ItemGroup>
<ItemGroup>
......
using System;
using System.Threading;
using System.Threading.Tasks;
using Common.Helper;
using Common.Logger;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using TNet;
namespace TNetTest
{
[TestClass]
public class TSocketTest
{
private Barrier barrier;
private const int clientNum = 10;
[TestMethod]
public void SendRecv()
{
barrier = new Barrier(clientNum + 2);
IPoller poller = new TPoller();
Task.Factory.StartNew(() => poller.Run(), TaskCreationOptions.LongRunning);
poller.Add(() => Server(poller));
Thread.Sleep(500);
for (int i = 0; i < clientNum; ++i)
{
poller.Add(() => Request(poller));
}
this.barrier.SignalAndWait();
}
private async void Server(IPoller poller)
{
TSocket acceptor = new TSocket(poller);
acceptor.Bind("127.0.0.1", 10000);
acceptor.Listen(100);
for (int i = 0; i < clientNum; i++)
{
TSocket socket = new TSocket(poller);
await acceptor.AcceptAsync(socket);
Response(socket);
}
this.barrier.RemoveParticipant();
}
private static async void Response(TSocket socket)
{
byte[] buffer = new byte[10];
for (int i = 0; i < 10000; i++)
{
await socket.RecvAsync(buffer, 0, buffer.Length);
Array.Reverse(buffer);
await socket.SendAsync(buffer, 0, buffer.Length);
}
await socket.DisconnectAsync();
}
private async void Request(IPoller poller)
{
TSocket client = new TSocket(poller);
for (int i = 0; i < 10000; i++)
{
await client.ConnectAsync("127.0.0.1", 10000);
byte[] buffer = "0123456789".ToByteArray();
await client.SendAsync(buffer, 0, buffer.Length);
await client.RecvAsync(buffer, 0, buffer.Length);
Assert.AreEqual("9876543210", buffer.ToStr());
}
Log.Debug("1111111111111111111111111111111111111");
this.barrier.RemoveParticipant();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册