TChannel.cs 7.6 KB
Newer Older
1
using System;
2
using System.Collections.Generic;
T
tanghai 已提交
3
using System.IO;
4
using System.Linq;
5
using System.Net;
6
using System.Net.Sockets;
7
using Microsoft.IO;
8

9
namespace ET
10
{
11 12 13 14
	/// <summary>
	/// 封装Socket,将回调push到主线程处理
	/// </summary>
	public sealed class TChannel: AChannel
15
	{
16
		private Socket socket;
17 18
		private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
		private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
19

20 21
		private readonly CircularBuffer recvBuffer = new CircularBuffer();
		private readonly CircularBuffer sendBuffer = new CircularBuffer();
22

23 24
		private readonly MemoryStream memoryStream;

25
		private bool isSending;
26

27
		private bool isConnected;
28

29
		private readonly PacketParser parser;
30

31
		private readonly byte[] packetSizeCache;
T
tanghai 已提交
32 33

		private readonly IPEndPoint remoteIpEndPoint;
34
		
35
		public TChannel(IPEndPoint ipEndPoint, TService service): base(service, ChannelType.Connect)
36
		{
T
tanghai 已提交
37
			this.packetSizeCache = new byte[4];
38
			this.memoryStream = service.MemoryStreamManager.GetStream("message", ushort.MaxValue);
39
			
40 41
			this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
			this.socket.NoDelay = true;
T
tanghai 已提交
42
			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
43 44
			this.innArgs.Completed += this.OnComplete;
			this.outArgs.Completed += this.OnComplete;
45

T
tanghai 已提交
46 47
			this.RemoteAddress = ipEndPoint.ToString();
			this.remoteIpEndPoint = ipEndPoint;
48 49
			this.isConnected = false;
			this.isSending = false;
50
		}
51 52
		
		public TChannel(Socket socket, TService service): base(service, ChannelType.Accept)
53
		{
T
tanghai 已提交
54
			this.packetSizeCache = new byte[Packet.PacketSizeLength];
55
			this.memoryStream = service.MemoryStreamManager.GetStream("message", ushort.MaxValue);
56
			
57 58
			this.socket = socket;
			this.socket.NoDelay = true;
T
tanghai 已提交
59
			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
60 61
			this.innArgs.Completed += this.OnComplete;
			this.outArgs.Completed += this.OnComplete;
62

T
tanghai 已提交
63 64
			this.RemoteAddress = socket.RemoteEndPoint.ToString();
			this.remoteIpEndPoint = (IPEndPoint)socket.RemoteEndPoint;
65
			this.isConnected = true;
66
			this.isSending = false;
67
		}
T
tanghai 已提交
68

69 70
		public override void Dispose()
		{
71
			if (this.IsDisposed)
72 73 74
			{
				return;
			}
75
			
76
			base.Dispose();
77 78 79 80
			
			this.socket.Close();
			this.innArgs.Dispose();
			this.outArgs.Dispose();
81 82
			this.innArgs = null;
			this.outArgs = null;
83
			this.socket = null;
84 85
			this.memoryStream.Dispose();
		}
T
tanghai 已提交
86 87 88 89 90 91 92 93 94 95 96 97

		public override void Start()
		{
			if (this.ChannelType == ChannelType.Accept)
			{
				this.StartRecv();
			}
			else
			{
				this.ConnectAsync(this.remoteIpEndPoint);
			}
		}
98 99 100
		
		private TService GetService()
		{
101
			return (TService)this.Service;
102
		}
103

T
tanghai 已提交
104 105 106 107
		public override MemoryStream Stream
		{
			get
			{
108
				return this.memoryStream;
T
tanghai 已提交
109 110 111 112
			}
		}
		
		public override void Send(MemoryStream stream)
113
		{
114
			if (this.IsDisposed)
T
tanghai 已提交
115 116 117
			{
				throw new Exception("TChannel已经被Dispose, 不能发送消息");
			}
T
tanghai 已提交
118

T
tanghai 已提交
119
			if (stream.Length > ushort.MaxValue * 16)
120
			{
T
tanghai 已提交
121
				throw new Exception($"send packet too large: {stream.Length}");
122
			}
T
tanghai 已提交
123
			this.packetSizeCache.WriteTo(0, (int) stream.Length);
124

125
			this.sendBuffer.Write(this.packetSizeCache, 0, this.packetSizeCache.Length);
126
			this.sendBuffer.Write(stream);
127

128
			this.GetService().MarkNeedStartSend(this.Id);
129 130
		}

131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
		private void OnComplete(object sender, SocketAsyncEventArgs e)
		{
			switch (e.LastOperation)
			{
				case SocketAsyncOperation.Connect:
					OneThreadSynchronizationContext.Instance.Post(this.OnConnectComplete, e);
					break;
				case SocketAsyncOperation.Receive:
					OneThreadSynchronizationContext.Instance.Post(this.OnRecvComplete, e);
					break;
				case SocketAsyncOperation.Send:
					OneThreadSynchronizationContext.Instance.Post(this.OnSendComplete, e);
					break;
				case SocketAsyncOperation.Disconnect:
					OneThreadSynchronizationContext.Instance.Post(this.OnDisconnectComplete, e);
					break;
				default:
					throw new Exception($"socket error: {e.LastOperation}");
			}
		}

		public void ConnectAsync(IPEndPoint ipEndPoint)
		{
			this.outArgs.RemoteEndPoint = ipEndPoint;
			if (this.socket.ConnectAsync(this.outArgs))
			{
				return;
			}
			OnConnectComplete(this.outArgs);
		}

		private void OnConnectComplete(object o)
		{
164
			if (this.socket == null)
165 166 167
			{
				return;
			}
168 169
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
			
170 171 172 173 174
			if (e.SocketError != SocketError.Success)
			{
				this.OnError((int)e.SocketError);	
				return;
			}
T
tanghai 已提交
175 176

			e.RemoteEndPoint = null;
177
			this.isConnected = true;
T
tanghai 已提交
178 179
			this.StartRecv();
			this.GetService().MarkNeedStartSend(this.Id);
180 181 182 183 184 185 186 187
		}

		private void OnDisconnectComplete(object o)
		{
			SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
			this.OnError((int)e.SocketError);
		}

T
tanghai 已提交
188
		public void StartRecv()
189 190 191 192 193 194
		{
			int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
			this.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
		}

		public void RecvAsync(byte[] buffer, int offset, int count)
195
		{
196
			try
197
			{
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
				this.innArgs.SetBuffer(buffer, offset, count);
			}
			catch (Exception e)
			{
				throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
			}
			
			if (this.socket.ReceiveAsync(this.innArgs))
			{
				return;
			}
			OnRecvComplete(this.innArgs);
		}

		private void OnRecvComplete(object o)
		{
214
			if (this.socket == null)
215 216 217
			{
				return;
			}
218
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
219 220 221 222 223 224 225 226 227

			if (e.SocketError != SocketError.Success)
			{
				this.OnError((int)e.SocketError);
				return;
			}

			if (e.BytesTransferred == 0)
			{
228
				this.OnError(ErrorCode.ERR_PeerDisconnect);
229 230 231 232 233 234 235 236 237 238 239 240 241
				return;
			}

			this.recvBuffer.LastIndex += e.BytesTransferred;
			if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
			{
				this.recvBuffer.AddLast();
				this.recvBuffer.LastIndex = 0;
			}

			// 收到消息回调
			while (true)
			{
242
				try
243
				{
244 245 246 247 248 249 250
					if (!this.parser.Parse())
					{
						break;
					}
				}
				catch (Exception ee)
				{
251
					Log.Error($"ip: {this.RemoteAddress} {ee}");
252 253
					this.OnError(ErrorCode.ERR_SocketError);
					return;
254
				}
255

256 257
				try
				{
258
					this.OnRead(this.parser.GetPacket());
259
				}
260
				catch (Exception ee)
261
				{
262
					Log.Error(ee);
263
				}
264
			}
265

266
			if (this.socket == null)
267
			{
268
				return;
269
			}
270
			
271 272 273
			this.StartRecv();
		}

274 275 276
		public bool IsSending => this.isSending;

		public void StartSend()
277
		{
278
			if(!this.isConnected)
279
			{
280
				return;
281
			}
282 283 284 285 286 287 288
			
			// 没有数据需要发送
			if (this.sendBuffer.Length == 0)
			{
				this.isSending = false;
				return;
			}
289 290 291 292 293

			this.isSending = true;

			int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
			if (sendSize > this.sendBuffer.Length)
294
			{
295
				sendSize = (int)this.sendBuffer.Length;
296
			}
297 298

			this.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
299 300
		}

301
		public void SendAsync(byte[] buffer, int offset, int count)
302
		{
303
			try
304
			{
305
				this.outArgs.SetBuffer(buffer, offset, count);
306
			}
307
			catch (Exception e)
308
			{
309
				throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
310
			}
311
			if (this.socket.SendAsync(this.outArgs))
312
			{
313
				return;
314
			}
315 316 317 318 319
			OnSendComplete(this.outArgs);
		}

		private void OnSendComplete(object o)
		{
320
			if (this.socket == null)
321 322 323
			{
				return;
			}
324
			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
325 326

			if (e.SocketError != SocketError.Success)
327
			{
328 329
				this.OnError((int)e.SocketError);
				return;
330
			}
331 332 333 334 335 336 337
			
			if (e.BytesTransferred == 0)
			{
				this.OnError(ErrorCode.ERR_PeerDisconnect);
				return;
			}
			
338 339 340 341 342 343
			this.sendBuffer.FirstIndex += e.BytesTransferred;
			if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
			{
				this.sendBuffer.FirstIndex = 0;
				this.sendBuffer.RemoveFirst();
			}
344
			
345
			this.StartSend();
346 347 348
		}
	}
}