Session.cs 7.9 KB
Newer Older
1 2
using System;
using System.Collections.Generic;
3
using System.Net;
4 5 6
using System.Threading;
using System.Threading.Tasks;

7
namespace Model
8
{
T
tanghai 已提交
9 10 11 12 13 14 15 16 17 18 19 20 21 22
	[ObjectEvent]
	public class SessionEvent : ObjectEvent<Session>, IAwake<NetworkComponent, AChannel>, IStart
	{
		public void Awake(NetworkComponent network, AChannel channel)
		{
			this.Get().Awake(network, channel);
		}

		public void Start()
		{
			this.Get().Start();
		}
	}

23
	public sealed class Session : Entity
24
	{
25
		private static uint RpcId { get; set; }
T
tanghai 已提交
26 27 28
		private NetworkComponent network;
		private AChannel channel;

29
		private readonly Dictionary<uint, Action<object>> requestCallback = new Dictionary<uint, Action<object>>();
30
		private readonly List<byte[]> byteses = new List<byte[]>() {new byte[0], new byte[0]};
31
		
T
tanghai 已提交
32 33 34 35 36 37 38 39
		public void Awake(NetworkComponent net, AChannel c)
		{
			this.network = net;
			this.channel = c;
			this.requestCallback.Clear();
		}

		public void Start()
40
		{
T
tanghai 已提交
41 42 43
			this.StartRecv();
		}

T
tanghai 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
		public override void Dispose()
		{
			if (this.Id == 0)
			{
				return;
			}

			long id = this.Id;

			base.Dispose();

			this.channel.Dispose();
			this.network.Remove(id);
			this.requestCallback.Clear();
		}

60
		public IPEndPoint RemoteAddress
T
tanghai 已提交
61 62 63 64 65
		{
			get
			{
				return this.channel.RemoteAddress;
			}
66
		}
67

68 69 70 71 72 73 74 75
		public ChannelType ChannelType
		{
			get
			{
				return this.channel.ChannelType;
			}
		}

T
tanghai 已提交
76
		private async void StartRecv()
77 78 79
		{
			while (true)
			{
T
tanghai 已提交
80 81 82 83 84
				if (this.Id == 0)
				{
					return;
				}

85
				Packet packet;
86 87
				try
				{
88
					packet = await this.channel.Recv();
89
					if (this.Id == 0)
90
					{
91
						return;
92
					}
93 94 95 96 97 98 99
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
					continue;
				}

100
				if (packet.Length < 2)
101
				{
102
					Log.Error($"message error length < 2, ip: {this.RemoteAddress}");
103 104
					this.network.Remove(this.Id);
					return;
105 106
				}

107
				ushort opcode = BitConverter.ToUInt16(packet.Bytes, 0);
108 109
				try
				{
110
					this.RunDecompressedBytes(opcode, packet.Bytes, 2, packet.Length);
111 112 113 114 115 116 117 118
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
				}
			}
		}

119
		private void RunDecompressedBytes(ushort opcode, byte[] messageBytes, int offset, int count)
120
		{
121
			object message;
T
tanghai 已提交
122
			Opcode op;
123 124 125

			try
			{
T
tanghai 已提交
126
				op = (Opcode)opcode;
127
				Type messageType = this.network.Entity.GetComponent<OpcodeTypeComponent>().GetType(op);
128
				message = this.network.MessagePacker.DeserializeFrom(messageType, messageBytes, offset, count - offset);
129 130 131
			}
			catch (Exception e)
			{
T
tanghai 已提交
132
				Log.Error($"message deserialize error, ip: {this.RemoteAddress} {opcode} {e}");
133 134 135
				this.network.Remove(this.Id);
				return;
			}
136

T
tanghai 已提交
137
			//Log.Debug($"recv: {MongoHelper.ToJson(message)}");
T
tanghai 已提交
138

139 140
			AResponse response = message as AResponse;
			if (response != null)
141
			{
142 143 144 145 146 147 148 149 150
				// rpcFlag>0 表示这是一个rpc响应消息
				// Rpc回调有找不着的可能,因为client可能取消Rpc调用
				Action<object> action;
				if (!this.requestCallback.TryGetValue(response.RpcId, out action))
				{
					return;
				}
				this.requestCallback.Remove(response.RpcId);
				action(message);
151
				return;
152
			}
153

T
tanghai 已提交
154
			this.network.MessageDispatcher.Dispatch(this, op, offset, messageBytes, (AMessage)message);
155
		}
T
tanghai 已提交
156 157 158 159
		
		/// <summary>
		/// Rpc调用
		/// </summary>
160
		public void CallWithAction(ARequest request, Action<AResponse> action)
T
tanghai 已提交
161 162 163
		{
			request.RpcId = ++RpcId;

164
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
165 166 167 168
			{
				try
				{
					AResponse response = (AResponse)message;
169
					action(response);
T
tanghai 已提交
170 171 172
				}
				catch (Exception e)
				{
173
					Log.Error(e.ToString());
174 175
				}
			};
176 177
			
			this.SendMessage(request);
178 179
		}

T
tanghai 已提交
180 181 182 183 184 185 186 187
		/// <summary>
		/// Rpc调用,发送一个消息,等待返回一个消息
		/// </summary>
		public Task<AResponse> Call(ARequest request, bool isHotfix)
		{
			request.RpcId = ++RpcId;

			var tcs = new TaskCompletionSource<AResponse>();
188
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
			{
				try
				{
					AResponse response = (AResponse)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
				}
			};

207
			this.SendMessage(request);
T
tanghai 已提交
208 209 210 211 212 213 214 215 216
			return tcs.Task;
		}

		/// <summary>
		/// Rpc调用
		/// </summary>
		public Task<AResponse> Call(ARequest request, bool isHotfix, CancellationToken cancellationToken)
		{
			request.RpcId = ++RpcId;
217
			
T
tanghai 已提交
218 219
			var tcs = new TaskCompletionSource<AResponse>();

220
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
			{
				try
				{
					AResponse response = (AResponse)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
				}
			};

239
			cancellationToken.Register(() => { this.requestCallback.Remove(request.RpcId); });
T
tanghai 已提交
240

241 242
			this.SendMessage(request);

T
tanghai 已提交
243 244 245
			return tcs.Task;
		}

246 247 248
		/// <summary>
		/// Rpc调用,发送一个消息,等待返回一个消息
		/// </summary>
249
		public Task<Response> Call<Response>(ARequest request) where Response : AResponse
250
		{
251
			request.RpcId = ++RpcId;
252
			
253
			var tcs = new TaskCompletionSource<Response>();
254
			this.requestCallback[request.RpcId] = (message) =>
255 256 257
			{
				try
				{
258
					Response response = (Response)message;
259
					if (response.Error > 100)
260
					{
261
						tcs.SetException(new RpcException(response.Error, response.Message));
262 263
						return;
					}
T
tanghai 已提交
264
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
265 266 267 268
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
269
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
270 271
				}
			};
272

273 274
			this.SendMessage(request);

275 276 277
			return tcs.Task;
		}

T
tanghai 已提交
278 279 280 281 282 283 284
		/// <summary>
		/// Rpc调用
		/// </summary>
		public Task<Response> Call<Response>(ARequest request, CancellationToken cancellationToken)
			where Response : AResponse
		{
			request.RpcId = ++RpcId;
285
			
T
tanghai 已提交
286 287
			var tcs = new TaskCompletionSource<Response>();

288
			this.requestCallback[request.RpcId] = (message) =>
T
tanghai 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
			{
				try
				{
					Response response = (Response)message;
					if (response.Error > 100)
					{
						tcs.SetException(new RpcException(response.Error, response.Message));
						return;
					}
					//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
				}
			};

307
			cancellationToken.Register(() => { this.requestCallback.Remove(request.RpcId); });
T
tanghai 已提交
308

309 310
			this.SendMessage(request);

T
tanghai 已提交
311 312 313
			return tcs.Task;
		}

314
		public void Send(AMessage message)
315
		{
T
tanghai 已提交
316 317 318 319
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
320
			this.SendMessage(message);
321 322
		}

323
		public void Reply<Response>(Response message) where Response : AResponse
324
		{
T
tanghai 已提交
325 326 327 328
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
329
			this.SendMessage(message);
330 331
		}

332
		private void SendMessage(object message)
333
		{
T
tanghai 已提交
334
			//Log.Debug($"send: {MongoHelper.ToJson(message)}");
335
			Opcode opcode = this.network.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
T
tanghai 已提交
336
			ushort op = (ushort)opcode;
337
			byte[] messageBytes = this.network.MessagePacker.SerializeToByteArray(message);
338 339 340 341

#if SERVER
			// 如果是allserver,内部消息不走网络,直接转给session,方便调试时看到整体堆栈
			if (this.network.AppType == AppType.AllServer)
342
			{
343
				Session session = this.network.GetComponent<NetInnerComponent>().Get(this.RemoteAddress.ToString());
344
				session.RunDecompressedBytes(op, messageBytes, 0, messageBytes.Length);
345
				return;
346
			}
347
#endif
348

T
tanghai 已提交
349
			byte[] opcodeBytes = BitConverter.GetBytes(op);
350 351 352
			
			this.byteses[0] = opcodeBytes;
			this.byteses[1] = messageBytes;
353

354
			channel.Send(this.byteses);
355 356 357
		}
	}
}