TChannel.cs 7.5 KB
Newer Older
1 2
using System;
using System.Collections.Generic;
T
tanghai 已提交
3
using System.IO;
4
using System.Linq;
5
using System.Net;
6
using System.Net.Sockets;
7
using Microsoft.IO;
8

9
namespace ETModel
10
{
11 12 13 14
	/// <summary>
	/// 封装Socket,将回调push到主线程处理
	/// </summary>
	public sealed class TChannel: AChannel
15
	{
16
		private Socket socket;
17 18
		private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
		private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
19

20 21
		private readonly CircularBuffer recvBuffer = new CircularBuffer();
		private readonly CircularBuffer sendBuffer = new CircularBuffer();
22

23 24
		private readonly MemoryStream memoryStream;

25
		private bool isSending;
26

27 28
		private bool isRecving;

29
		private bool isConnected;
30

31
		private readonly PacketParser parser;
32

33
		private readonly byte[] cache = new byte[Packet.SizeLength];
34
		
35
		public TChannel(IPEndPoint ipEndPoint, TService service): base(service, ChannelType.Connect)
36
		{
37
			this.InstanceId = IdGenerater.GenerateId();
38
			this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
39
			
40 41
			this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
			this.socket.NoDelay = true;
42
			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
43 44
			this.innArgs.Completed += this.OnComplete;
			this.outArgs.Completed += this.OnComplete;
45

46
			this.RemoteAddress = ipEndPoint;
47 48 49

			this.isConnected = false;
			this.isSending = false;
50
		}
51 52
		
		public TChannel(Socket socket, TService service): base(service, ChannelType.Accept)
53
		{
54
			this.InstanceId = IdGenerater.GenerateId();
55
			this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
56
			
57 58
			this.socket = socket;
			this.socket.NoDelay = true;
59
			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
60 61
			this.innArgs.Completed += this.OnComplete;
			this.outArgs.Completed += this.OnComplete;
62

63 64 65
			this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint;
			
			this.isConnected = true;
66
			this.isSending = false;
67
		}
T
tanghai 已提交
68

69 70
		public override void Dispose()
		{
71
			if (this.IsDisposed)
72 73 74
			{
				return;
			}
75
			
76
			base.Dispose();
77 78 79 80
			
			this.socket.Close();
			this.innArgs.Dispose();
			this.outArgs.Dispose();
81 82
			this.innArgs = null;
			this.outArgs = null;
83
			this.socket = null;
84 85 86 87 88 89
			this.memoryStream.Dispose();
		}
		
		private TService GetService()
		{
			return (TService)this.service;
90
		}
91

T
tanghai 已提交
92 93 94 95
		public override MemoryStream Stream
		{
			get
			{
96
				return this.memoryStream;
T
tanghai 已提交
97 98 99
			}
		}

100
		public override void Start()
101
		{
102 103 104 105 106
			if (!this.isConnected)
			{
				this.ConnectAsync(this.RemoteAddress);
				return;
			}
107 108 109 110 111 112 113

			if (!this.isRecving)
			{
				this.isRecving = true;
				this.StartRecv();
			}

114
			this.GetService().MarkNeedStartSend(this.Id);
115
		}
T
tanghai 已提交
116 117
		
		public override void Send(MemoryStream stream)
118
		{
119
			if (this.IsDisposed)
T
tanghai 已提交
120 121 122
			{
				throw new Exception("TChannel已经被Dispose, 不能发送消息");
			}
T
tanghai 已提交
123

124 125 126 127 128 129 130 131 132 133 134 135
			switch (Packet.SizeLength)
			{
				case 4:
					this.cache.WriteTo(0, (int) stream.Length);
					break;
				case 2:
					this.cache.WriteTo(0, (ushort) stream.Length);
					break;
				default:
					throw new Exception("packet size must be 2 or 4!");
			}

136
			this.sendBuffer.Write(this.cache, 0, this.cache.Length);
137
			this.sendBuffer.Write(stream);
138

139
			this.GetService().MarkNeedStartSend(this.Id);
140 141
		}

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
		private void OnComplete(object sender, SocketAsyncEventArgs e)
		{
			switch (e.LastOperation)
			{
				case SocketAsyncOperation.Connect:
					OneThreadSynchronizationContext.Instance.Post(this.OnConnectComplete, e);
					break;
				case SocketAsyncOperation.Receive:
					OneThreadSynchronizationContext.Instance.Post(this.OnRecvComplete, e);
					break;
				case SocketAsyncOperation.Send:
					OneThreadSynchronizationContext.Instance.Post(this.OnSendComplete, e);
					break;
				case SocketAsyncOperation.Disconnect:
					OneThreadSynchronizationContext.Instance.Post(this.OnDisconnectComplete, e);
					break;
				default:
					throw new Exception($"socket error: {e.LastOperation}");
			}
		}

		public void ConnectAsync(IPEndPoint ipEndPoint)
		{
			this.outArgs.RemoteEndPoint = ipEndPoint;
			if (this.socket.ConnectAsync(this.outArgs))
			{
				return;
			}
			OnConnectComplete(this.outArgs);
		}

		private void OnConnectComplete(object o)
		{
175
			if (this.socket == null)
176 177 178
			{
				return;
			}
179 180
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
			
181 182 183 184 185
			if (e.SocketError != SocketError.Success)
			{
				this.OnError((int)e.SocketError);	
				return;
			}
T
tanghai 已提交
186 187

			e.RemoteEndPoint = null;
188 189
			this.isConnected = true;
			
T
tanghai 已提交
190
			this.Start();
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
		}

		private void OnDisconnectComplete(object o)
		{
			SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
			this.OnError((int)e.SocketError);
		}

		private void StartRecv()
		{
			int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
			this.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
		}

		public void RecvAsync(byte[] buffer, int offset, int count)
206
		{
207
			try
208
			{
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
				this.innArgs.SetBuffer(buffer, offset, count);
			}
			catch (Exception e)
			{
				throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
			}
			
			if (this.socket.ReceiveAsync(this.innArgs))
			{
				return;
			}
			OnRecvComplete(this.innArgs);
		}

		private void OnRecvComplete(object o)
		{
225
			if (this.socket == null)
226 227 228
			{
				return;
			}
229
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
230 231 232 233 234 235 236 237 238

			if (e.SocketError != SocketError.Success)
			{
				this.OnError((int)e.SocketError);
				return;
			}

			if (e.BytesTransferred == 0)
			{
239
				this.OnError(ErrorCode.ERR_PeerDisconnect);
240 241 242 243 244 245 246 247 248 249 250 251 252 253
				return;
			}

			this.recvBuffer.LastIndex += e.BytesTransferred;
			if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
			{
				this.recvBuffer.AddLast();
				this.recvBuffer.LastIndex = 0;
			}

			// 收到消息回调
			while (true)
			{
				if (!this.parser.Parse())
254
				{
255
					break;
256
				}
257

258
				MemoryStream stream = this.parser.GetPacket();
259 260
				try
				{
261
					this.OnRead(stream);
262 263
				}
				catch (Exception exception)
264
				{
265
					Log.Error(exception);
266
				}
267
			}
268

269
			if (this.socket == null)
270
			{
271
				return;
272
			}
273
			
274 275 276
			this.StartRecv();
		}

277 278 279
		public bool IsSending => this.isSending;

		public void StartSend()
280
		{
281
			if(!this.isConnected)
282
			{
283
				return;
284
			}
285 286 287 288 289 290 291
			
			// 没有数据需要发送
			if (this.sendBuffer.Length == 0)
			{
				this.isSending = false;
				return;
			}
292 293 294 295 296

			this.isSending = true;

			int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
			if (sendSize > this.sendBuffer.Length)
297
			{
298
				sendSize = (int)this.sendBuffer.Length;
299
			}
300 301

			this.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
302 303
		}

304
		public void SendAsync(byte[] buffer, int offset, int count)
305
		{
306
			try
307
			{
308
				this.outArgs.SetBuffer(buffer, offset, count);
309
			}
310
			catch (Exception e)
311
			{
312
				throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
313
			}
314
			if (this.socket.SendAsync(this.outArgs))
315
			{
316
				return;
317
			}
318 319 320 321 322
			OnSendComplete(this.outArgs);
		}

		private void OnSendComplete(object o)
		{
323
			if (this.socket == null)
324 325 326
			{
				return;
			}
327
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
328 329

			if (e.SocketError != SocketError.Success)
330
			{
331 332
				this.OnError((int)e.SocketError);
				return;
333
			}
334 335 336 337 338 339 340
			
			if (e.BytesTransferred == 0)
			{
				this.OnError(ErrorCode.ERR_PeerDisconnect);
				return;
			}
			
341 342 343 344 345 346
			this.sendBuffer.FirstIndex += e.BytesTransferred;
			if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
			{
				this.sendBuffer.FirstIndex = 0;
				this.sendBuffer.RemoveFirst();
			}
347
			
348
			this.StartSend();
349 350 351
		}
	}
}