Session.cs 7.6 KB
Newer Older
1 2
using System;
using System.Collections.Generic;
3
using System.Net;
4 5 6
using System.Threading;
using System.Threading.Tasks;

7
namespace Model
8
{
9
	public sealed class Session : Entity
10
	{
11
		private static uint RpcId { get; set; }
12
		private readonly NetworkComponent network;
13
		private readonly Dictionary<uint, Action<object>> requestCallback = new Dictionary<uint, Action<object>>();
14
		private readonly AChannel channel;
15
		private readonly List<byte[]> byteses = new List<byte[]>() {new byte[0], new byte[0]};
16
		
17
		public Session(NetworkComponent network, AChannel channel)
18
		{
19
			this.network = network;
20
			this.channel = channel;
21
			
T
tanghai 已提交
22 23 24
			this.StartRecv();
		}

25
		public IPEndPoint RemoteAddress
T
tanghai 已提交
26 27 28 29 30
		{
			get
			{
				return this.channel.RemoteAddress;
			}
31
		}
32

33 34 35 36 37 38 39 40
		public ChannelType ChannelType
		{
			get
			{
				return this.channel.ChannelType;
			}
		}

T
tanghai 已提交
41
		private async void StartRecv()
42 43 44
		{
			while (true)
			{
T
tanghai 已提交
45 46 47 48 49
				if (this.Id == 0)
				{
					return;
				}

50
				Packet packet;
51 52
				try
				{
53
					packet = await this.channel.Recv();
54
					if (this.Id == 0)
55
					{
56
						return;
57
					}
58 59 60 61 62 63 64
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
					continue;
				}

65
				if (packet.Length < 2)
66
				{
67
					Log.Error($"message error length < 2, ip: {this.RemoteAddress}");
68 69
					this.network.Remove(this.Id);
					return;
70 71
				}

72
				ushort opcode = BitConverter.ToUInt16(packet.Bytes, 0);
73 74
				try
				{
75
					this.RunDecompressedBytes(opcode, packet.Bytes, 2, packet.Length);
76 77 78 79 80 81 82 83
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
				}
			}
		}

84
		private void RunDecompressedBytes(ushort opcode, byte[] messageBytes, int offset, int count)
85
		{
86
			object message;
T
tanghai 已提交
87
			Opcode op;
88 89 90

			try
			{
T
tanghai 已提交
91
				op = (Opcode)opcode;
92
				Type messageType = this.network.Entity.GetComponent<OpcodeTypeComponent>().GetType(op);
93
				message = this.network.MessagePacker.DeserializeFrom(messageType, messageBytes, offset, count - offset);
94 95 96
			}
			catch (Exception e)
			{
T
tanghai 已提交
97
				Log.Error($"message deserialize error, ip: {this.RemoteAddress} {opcode} {e}");
98 99 100
				this.network.Remove(this.Id);
				return;
			}
101

T
tanghai 已提交
102
			//Log.Debug($"recv: {MongoHelper.ToJson(message)}");
T
tanghai 已提交
103

104 105
			AResponse response = message as AResponse;
			if (response != null)
106
			{
107 108 109 110 111 112 113 114 115
				// rpcFlag>0 表示这是一个rpc响应消息
				// Rpc回调有找不着的可能,因为client可能取消Rpc调用
				Action<object> action;
				if (!this.requestCallback.TryGetValue(response.RpcId, out action))
				{
					return;
				}
				this.requestCallback.Remove(response.RpcId);
				action(message);
116
				return;
117
			}
118

T
tanghai 已提交
119
			this.network.MessageDispatcher.Dispatch(this, op, offset, messageBytes, (AMessage)message);
120
		}
T
tanghai 已提交
121 122 123 124
		
		/// <summary>
		/// Rpc调用
		/// </summary>
125
		public void CallWithAction(ARequest request, Action<AResponse> action)
T
tanghai 已提交
126 127 128
		{
			request.RpcId = ++RpcId;

129
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
130 131 132 133
			{
				try
				{
					AResponse response = (AResponse)message;
134
					action(response);
T
tanghai 已提交
135 136 137
				}
				catch (Exception e)
				{
138
					Log.Error(e.ToString());
139 140
				}
			};
141 142
			
			this.SendMessage(request);
143 144
		}

T
tanghai 已提交
145 146 147 148 149 150 151 152
		/// <summary>
		/// Rpc调用,发送一个消息,等待返回一个消息
		/// </summary>
		public Task<AResponse> Call(ARequest request, bool isHotfix)
		{
			request.RpcId = ++RpcId;

			var tcs = new TaskCompletionSource<AResponse>();
153
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
			{
				try
				{
					AResponse response = (AResponse)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
				}
			};

172
			this.SendMessage(request);
T
tanghai 已提交
173 174 175 176 177 178 179 180 181
			return tcs.Task;
		}

		/// <summary>
		/// Rpc调用
		/// </summary>
		public Task<AResponse> Call(ARequest request, bool isHotfix, CancellationToken cancellationToken)
		{
			request.RpcId = ++RpcId;
182
			
T
tanghai 已提交
183 184
			var tcs = new TaskCompletionSource<AResponse>();

185
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
			{
				try
				{
					AResponse response = (AResponse)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
				}
			};

204
			cancellationToken.Register(() => { this.requestCallback.Remove(request.RpcId); });
T
tanghai 已提交
205

206 207
			this.SendMessage(request);

T
tanghai 已提交
208 209 210
			return tcs.Task;
		}

211 212 213
		/// <summary>
		/// Rpc调用,发送一个消息,等待返回一个消息
		/// </summary>
214
		public Task<Response> Call<Response>(ARequest request) where Response : AResponse
215
		{
216
			request.RpcId = ++RpcId;
217
			
218
			var tcs = new TaskCompletionSource<Response>();
219
			this.requestCallback[request.RpcId] = (message) =>
220 221 222
			{
				try
				{
223
					Response response = (Response)message;
224
					if (response.Error > 100)
225
					{
226
						tcs.SetException(new RpcException(response.Error, response.Message));
227 228
						return;
					}
T
tanghai 已提交
229
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
230 231 232 233
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
234
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
235 236
				}
			};
237

238 239
			this.SendMessage(request);

240 241 242
			return tcs.Task;
		}

T
tanghai 已提交
243 244 245 246 247 248 249
		/// <summary>
		/// Rpc调用
		/// </summary>
		public Task<Response> Call<Response>(ARequest request, CancellationToken cancellationToken)
			where Response : AResponse
		{
			request.RpcId = ++RpcId;
250
			
T
tanghai 已提交
251 252
			var tcs = new TaskCompletionSource<Response>();

253
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
			{
				try
				{
					Response response = (Response)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
				}
			};

272
			cancellationToken.Register(() => { this.requestCallback.Remove(request.RpcId); });
T
tanghai 已提交
273

274 275
			this.SendMessage(request);

T
tanghai 已提交
276 277 278
			return tcs.Task;
		}

279
		public void Send(AMessage message)
280
		{
T
tanghai 已提交
281 282 283 284
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
285
			this.SendMessage(message);
286 287
		}

288
		public void Reply<Response>(Response message) where Response : AResponse
289
		{
T
tanghai 已提交
290 291 292 293
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
294
			this.SendMessage(message);
295 296
		}

297
		private void SendMessage(object message)
298
		{
T
tanghai 已提交
299
			//Log.Debug($"send: {MongoHelper.ToJson(message)}");
300
			Opcode opcode = this.network.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
T
tanghai 已提交
301
			ushort op = (ushort)opcode;
302
			byte[] messageBytes = this.network.MessagePacker.SerializeToByteArray(message);
303 304 305 306

#if SERVER
			// 如果是allserver,内部消息不走网络,直接转给session,方便调试时看到整体堆栈
			if (this.network.AppType == AppType.AllServer)
307
			{
308
				Session session = this.network.GetComponent<NetInnerComponent>().Get(this.RemoteAddress.ToString());
309
				session.RunDecompressedBytes(op, messageBytes, 0, messageBytes.Length);
310
				return;
311
			}
312
#endif
313

T
tanghai 已提交
314
			byte[] opcodeBytes = BitConverter.GetBytes(op);
315 316 317
			
			this.byteses[0] = opcodeBytes;
			this.byteses[1] = messageBytes;
318

319
			channel.Send(this.byteses);
320 321 322 323 324 325 326 327 328
		}

		public override void Dispose()
		{
			if (this.Id == 0)
			{
				return;
			}

329 330
			long id = this.Id;

331
			base.Dispose();
T
tanghai 已提交
332
			
T
tanghai 已提交
333
			this.channel.Dispose();
334
			this.network.Remove(id);
335 336 337
		}
	}
}