From 506accd7c7113eb4d031e2b4a4565ef17b388f4b Mon Sep 17 00:00:00 2001 From: tanghai Date: Wed, 31 Dec 2014 15:34:42 +0800 Subject: [PATCH] =?UTF-8?q?TService=E6=B5=8B=E8=AF=95OK?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CSharp/Platform/Network/IService.cs | 4 +- CSharp/Platform/TNet/TBuffer.cs | 2 +- CSharp/Platform/TNet/TChannel.cs | 5 +- CSharp/Platform/TNet/TPoller.cs | 6 +-- CSharp/Platform/TNet/TService.cs | 45 ++++++++++------- CSharp/Platform/TNet/TSocket.cs | 2 + CSharp/Platform/TNetTest/TNetTest.csproj | 5 ++ CSharp/Platform/TNetTest/TServiceTest.cs | 62 ++++++++++++++++++++++++ CSharp/Platform/UNet/EService.cs | 2 +- CSharp/Platform/UNet/UService.cs | 14 +++++- CSharp/Platform/UNet/USocket.cs | 2 +- CSharp/Platform/UNetTest/UNetTest.csproj | 4 ++ CSharp/Platform/UNetTest/UServiceTest.cs | 12 +++-- 13 files changed, 129 insertions(+), 36 deletions(-) create mode 100644 CSharp/Platform/TNetTest/TServiceTest.cs diff --git a/CSharp/Platform/Network/IService.cs b/CSharp/Platform/Network/IService.cs index bacd591c..7ccb788c 100644 --- a/CSharp/Platform/Network/IService.cs +++ b/CSharp/Platform/Network/IService.cs @@ -19,6 +19,8 @@ namespace Network void Remove(IChannel channel); - void Start(); + void RunOnce(int timeout); + + void Run(); } } diff --git a/CSharp/Platform/TNet/TBuffer.cs b/CSharp/Platform/TNet/TBuffer.cs index ae502fe6..f6d77dcf 100644 --- a/CSharp/Platform/TNet/TBuffer.cs +++ b/CSharp/Platform/TNet/TBuffer.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; namespace TNet { - internal class TBuffer + public class TBuffer { public const int ChunkSize = 8096; diff --git a/CSharp/Platform/TNet/TChannel.cs b/CSharp/Platform/TNet/TChannel.cs index dadfa497..929423aa 100644 --- a/CSharp/Platform/TNet/TChannel.cs +++ b/CSharp/Platform/TNet/TChannel.cs @@ -26,7 +26,6 @@ namespace TNet this.socket = socket; this.service = service; this.parser = new PacketParser(recvBuffer); - Start(); } public void Dispose() @@ -42,6 +41,8 @@ namespace TNet public void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable) { + byte[] size = BitConverter.GetBytes(buffer.Length); + this.sendBuffer.SendTo(size); this.sendBuffer.SendTo(buffer); if (this.sendTimer == ObjectId.Empty) { @@ -125,7 +126,7 @@ namespace TNet tcs.SetResult(packet); } - private async void Start() + public async void Start() { try { diff --git a/CSharp/Platform/TNet/TPoller.cs b/CSharp/Platform/TNet/TPoller.cs index 271eecdb..a350ec58 100644 --- a/CSharp/Platform/TNet/TPoller.cs +++ b/CSharp/Platform/TNet/TPoller.cs @@ -4,7 +4,7 @@ using System.Collections.Generic; namespace TNet { - internal class TPoller : IPoller + public class TPoller : IPoller { // 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行 private readonly BlockingCollection blockingCollection = new BlockingCollection(); @@ -14,10 +14,6 @@ namespace TNet this.blockingCollection.Add(action); } - public void Dispose() - { - } - public void Run(int timeout) { // 处理读写线程的回调 diff --git a/CSharp/Platform/TNet/TService.cs b/CSharp/Platform/TNet/TService.cs index a5a7632e..86508bd1 100644 --- a/CSharp/Platform/TNet/TService.cs +++ b/CSharp/Platform/TNet/TService.cs @@ -8,13 +8,18 @@ namespace TNet { public class TService: IService { - private IPoller poller = new TPoller(); + private readonly IPoller poller = new TPoller(); private TSocket acceptor; - + private readonly Dictionary channels = new Dictionary(); private readonly TimerManager timerManager = new TimerManager(); + /// + /// 用作server端的构造函数 + /// + /// + /// public TService(string host, int port) { this.acceptor = new TSocket(poller); @@ -22,16 +27,22 @@ namespace TNet this.acceptor.Listen(100); } + /// + /// 用作client端的构造函数 + /// + public TService() + { + } + public void Dispose() { - if (this.poller == null) + if (this.acceptor == null) { return; } this.acceptor.Dispose(); this.acceptor = null; - this.poller = null; } public void Add(Action action) @@ -39,32 +50,27 @@ namespace TNet this.poller.Add(action); } - private async void AcceptAsync() - { - while (true) - { - TSocket newSocket = new TSocket(poller); - await this.acceptor.AcceptAsync(newSocket); - TChannel channel = new TChannel(newSocket, this); - channels[newSocket.RemoteAddress] = channel; - } - } - private async Task ConnectAsync(string host, int port) { TSocket newSocket = new TSocket(poller); await newSocket.ConnectAsync(host, port); TChannel channel = new TChannel(newSocket, this); channels[newSocket.RemoteAddress] = channel; + channel.Start(); return channel; } public async Task GetChannel() { + if (this.acceptor == null) + { + throw new Exception(string.Format("service construct must use host and port param")); + } TSocket socket = new TSocket(this.poller); await acceptor.AcceptAsync(socket); TChannel channel = new TChannel(socket, this); channels[channel.RemoteAddress] = channel; + channel.Start(); return channel; } @@ -96,13 +102,16 @@ namespace TNet return await GetChannel(ss[0], port); } - public void Start() + public void RunOnce(int timeout) { - AcceptAsync(); + poller.Run(timeout); + } + public void Run() + { while (true) { - poller.Run(1); + this.RunOnce(1); this.timerManager.Refresh(); } } diff --git a/CSharp/Platform/TNet/TSocket.cs b/CSharp/Platform/TNet/TSocket.cs index dbbaa320..97ef28f4 100644 --- a/CSharp/Platform/TNet/TSocket.cs +++ b/CSharp/Platform/TNet/TSocket.cs @@ -2,6 +2,7 @@ using System.Net; using System.Net.Sockets; using System.Threading.Tasks; +using Common.Logger; namespace TNet { @@ -153,6 +154,7 @@ namespace TNet private static void OnRecvComplete(SocketAsyncEventArgs e) { + Log.Debug("OnRecvComplete: " + e.BytesTransferred); var tcs = (TaskCompletionSource)e.UserToken; e.UserToken = null; if (e.SocketError != SocketError.Success) diff --git a/CSharp/Platform/TNetTest/TNetTest.csproj b/CSharp/Platform/TNetTest/TNetTest.csproj index b27c830c..997a42d6 100644 --- a/CSharp/Platform/TNetTest/TNetTest.csproj +++ b/CSharp/Platform/TNetTest/TNetTest.csproj @@ -53,12 +53,17 @@ + {19f8f043-1f99-4550-99df-dea5c7d77e55} Common + + {3bd499ff-3c34-4920-8b21-c55fba580843} + Network + {b42d431a-3a54-4649-942a-c5356d7f9fbc} TNet diff --git a/CSharp/Platform/TNetTest/TServiceTest.cs b/CSharp/Platform/TNetTest/TServiceTest.cs new file mode 100644 index 00000000..3d3b0358 --- /dev/null +++ b/CSharp/Platform/TNetTest/TServiceTest.cs @@ -0,0 +1,62 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Common.Helper; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Network; +using TNet; + +namespace TServiceTest +{ + [TestClass] + public class TServiceTest + { + private readonly Barrier barrier = new Barrier(3); + + private async void ClientEvent(IService service, string hostName, ushort port) + { + IChannel channel = await service.GetChannel(hostName, port); + channel.SendAsync("0123456789".ToByteArray()); + + byte[] bytes = await channel.RecvAsync(); + CollectionAssert.AreEqual("9876543210".ToByteArray(), bytes); + + barrier.RemoveParticipant(); + } + + private async void ServerEvent(IService service) + { + IChannel channel = await service.GetChannel(); + byte[] bytes = await channel.RecvAsync(); + CollectionAssert.AreEqual("0123456789".ToByteArray(), bytes); + Array.Reverse(bytes); + channel.SendAsync(bytes); + + barrier.RemoveParticipant(); + } + + [TestMethod] + public void ClientSendToServer() + { + const string hostName = "127.0.0.1"; + const ushort port = 8889; + IService clientService = new TService(); + IService serverService = new TService(hostName, 8889); + + Task.Factory.StartNew(() => clientService.Run(), TaskCreationOptions.LongRunning); + Task.Factory.StartNew(() => serverService.Run(), TaskCreationOptions.LongRunning); + + + + // 往server host线程增加事件,accept + serverService.Add(() => ServerEvent(serverService)); + + Thread.Sleep(1000); + + // 往client host线程增加事件,client线程连接server + clientService.Add(() => ClientEvent(clientService, hostName, port)); + + barrier.SignalAndWait(); + } + } +} \ No newline at end of file diff --git a/CSharp/Platform/UNet/EService.cs b/CSharp/Platform/UNet/EService.cs index 55b0268f..d4663884 100644 --- a/CSharp/Platform/UNet/EService.cs +++ b/CSharp/Platform/UNet/EService.cs @@ -270,7 +270,7 @@ namespace UNet } } - public void Start(int timeout = 0) + public void Run(int timeout = 0) { while (this.isRunning) { diff --git a/CSharp/Platform/UNet/UService.cs b/CSharp/Platform/UNet/UService.cs index a2b7a512..3847851f 100644 --- a/CSharp/Platform/UNet/UService.cs +++ b/CSharp/Platform/UNet/UService.cs @@ -16,6 +16,11 @@ namespace UNet this.service = new EService(host, (ushort)port); } + public UService() + { + this.service = new EService(); + } + public void Dispose() { if (service == null) @@ -84,9 +89,14 @@ namespace UNet return await ConnectAsync(host, port); } - public void Start() + public void RunOnce(int timeout) + { + this.service.RunOnce(timeout); + } + + public void Run() { - this.service.Start(); + this.service.Run(); } } } diff --git a/CSharp/Platform/UNet/USocket.cs b/CSharp/Platform/UNet/USocket.cs index 4c49ecd2..cc57341c 100644 --- a/CSharp/Platform/UNet/USocket.cs +++ b/CSharp/Platform/UNet/USocket.cs @@ -102,7 +102,7 @@ namespace UNet } var tcs = new TaskCompletionSource(); - var address = new Address { HostName = hostName, Port = port }; + Address address = new Address { HostName = hostName, Port = port }; ENetAddress nativeAddress = address.Struct; this.peerPtr = NativeMethods.EnetHostConnect(this.service.HostPtr, ref nativeAddress, channelLimit, data); diff --git a/CSharp/Platform/UNetTest/UNetTest.csproj b/CSharp/Platform/UNetTest/UNetTest.csproj index 53e9f8d3..229b3c2a 100644 --- a/CSharp/Platform/UNetTest/UNetTest.csproj +++ b/CSharp/Platform/UNetTest/UNetTest.csproj @@ -65,6 +65,10 @@ {3bd499ff-3c34-4920-8b21-c55fba580843} Network + + {b42d431a-3a54-4649-942a-c5356d7f9fbc} + TNet + {d0b4cfac-a368-4742-9863-68776cfa9938} UNet diff --git a/CSharp/Platform/UNetTest/UServiceTest.cs b/CSharp/Platform/UNetTest/UServiceTest.cs index 68d376ca..9e45c775 100644 --- a/CSharp/Platform/UNetTest/UServiceTest.cs +++ b/CSharp/Platform/UNetTest/UServiceTest.cs @@ -2,9 +2,11 @@ using System.Threading; using System.Threading.Tasks; using Common.Helper; +using Common.Logger; using UNet; using Microsoft.VisualStudio.TestTools.UnitTesting; using Network; +using TNet; namespace UNetTest { @@ -19,7 +21,7 @@ namespace UNetTest channel.SendAsync("0123456789".ToByteArray()); byte[] bytes = await channel.RecvAsync(); - Assert.AreEqual("9876543210".ToByteArray(), bytes); + CollectionAssert.AreEqual("9876543210".ToByteArray(), bytes); barrier.RemoveParticipant(); } @@ -28,7 +30,7 @@ namespace UNetTest { IChannel channel = await service.GetChannel(); byte[] bytes = await channel.RecvAsync(); - Assert.AreEqual("0123456789".ToByteArray(), bytes); + CollectionAssert.AreEqual("0123456789".ToByteArray(), bytes); Array.Reverse(bytes); channel.SendAsync(bytes); @@ -40,11 +42,11 @@ namespace UNetTest { const string hostName = "127.0.0.1"; const ushort port = 8889; - IService clientService = new UService(hostName, 8888); + IService clientService = new UService(); IService serverService = new UService(hostName, 8889); - Task.Factory.StartNew(() => clientService.Start(), TaskCreationOptions.LongRunning); - Task.Factory.StartNew(() => serverService.Start(), TaskCreationOptions.LongRunning); + Task.Factory.StartNew(() => clientService.Run(), TaskCreationOptions.LongRunning); + Task.Factory.StartNew(() => serverService.Run(), TaskCreationOptions.LongRunning); -- GitLab