Session.cs 5.3 KB
Newer Older
1 2 3 4
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
5
using Base;
6

7
namespace Model
8
{
9
	public sealed class Session: Entity
10
	{
11
		private static uint RpcId { get; set; }
12
		private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
13 14 15
		private readonly AChannel channel;

		public Session(AChannel channel) : base(EntityType.Session)
16
		{
17
			this.channel = channel;
T
tanghai 已提交
18 19 20 21 22 23 24 25 26
			this.StartRecv();
		}

		public string RemoteAddress
		{
			get
			{
				return this.channel.RemoteAddress;
			}
27
		}
28

T
tanghai 已提交
29
		private async void StartRecv()
30 31 32
		{
			while (true)
			{
T
tanghai 已提交
33 34 35 36 37
				if (this.Id == 0)
				{
					return;
				}

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
				byte[] messageBytes;
				try
				{
					messageBytes = await channel.Recv();
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
					continue;
				}

				if (messageBytes.Length < 6)
				{
					continue;
				}

				ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
				try
				{
					this.Run(opcode, messageBytes);
				}
				catch (Exception e)
				{
					Log.Error(e.ToString());
				}
			}
		}

		private void Run(ushort opcode, byte[] messageBytes)
		{
			int offset = 0;
69 70 71 72
			uint flag = BitConverter.ToUInt32(messageBytes, 2);
			uint rpcFlag = flag & 0x40000000;
			uint rpcId = flag & 0x3fffffff;
			bool isCompressed = (flag & 0x80000000) > 0;
73
			if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
74 75 76 77 78 79 80 81
			{
				messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
				offset = 0;
			}
			else
			{
				offset = 6;
			}
82

83
			this.RunDecompressedBytes(opcode, rpcId, rpcFlag, messageBytes, offset);
84 85
		}

86
		private void RunDecompressedBytes(ushort opcode, uint rpcId, uint rpcFlag, byte[] messageBytes, int offset)
87
		{
88
			// 普通消息
89
			if (rpcId == 0)
90
			{
T
tanghai 已提交
91
				Game.Scene.GetComponent<MessageDispatherComponent>().Handle(this, opcode, messageBytes, offset, 0);
92
				return;
93 94
			}

95 96
			// rpcFlag>0 表示这是一个rpc响应消息
			if (rpcFlag > 0)
97
			{
98
				Action<byte[], int, int> action;
99 100
				// Rpc回调有找不着的可能,因为client可能取消Rpc调用
				if (!this.requestCallback.TryGetValue(rpcId, out action))
101
				{
102
					return;
103
				}
104 105
				this.requestCallback.Remove(rpcId);
				action(messageBytes, offset, messageBytes.Length - offset);
106 107 108
			}
			else // 这是一个rpc请求消息
			{
T
tanghai 已提交
109
				Game.Scene.GetComponent<MessageDispatherComponent>().Handle(this, opcode, messageBytes, offset, rpcId);
110 111 112
			}
		}

113 114 115
		/// <summary>
		/// Rpc调用
		/// </summary>
116
		public Task<Response> Call<Request, Response>(Request request, CancellationToken cancellationToken)
117 118
			where Request : ARequest
			where Response : AResponse
119
		{
120
			this.SendMessage(++RpcId, request);
121 122 123

			var tcs = new TaskCompletionSource<Response>();

124
			this.requestCallback[RpcId] = (bytes, offset, count) =>
125 126 127 128
			{
				try
				{
					Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
129
					if (response.Error != 0)
130
					{
131
						tcs.SetException(new RpcException(response.Error, response.Message));
132 133 134 135 136 137 138 139 140 141
						return;
					}
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
				}
			};

142
			cancellationToken.Register(() => { this.requestCallback.Remove(RpcId); });
143 144 145 146 147 148 149

			return tcs.Task;
		}

		/// <summary>
		/// Rpc调用,发送一个消息,等待返回一个消息
		/// </summary>
150 151
		public Task<Response> Call<Request, Response>(Request request)
			where Request : ARequest
152
			where Response : AResponse
153
		{
154
			this.SendMessage(++RpcId, request);
155

156
			var tcs = new TaskCompletionSource<Response>();
157
			this.requestCallback[RpcId] = (bytes, offset, count) =>
158 159 160 161
			{
				try
				{
					Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
162
					if (response.Error != 0)
163
					{
164
						tcs.SetException(new RpcException(response.Error, response.Message));
165 166 167 168 169 170 171 172 173
						return;
					}
					tcs.SetResult(response);
				}
				catch (Exception e)
				{
					tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
				}
			};
174

175 176 177
			return tcs.Task;
		}

178
		public void Send<Message>(Message message) where Message : AMessage
179
		{
T
tanghai 已提交
180 181 182 183
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
184
			this.SendMessage(0, message);
185 186
		}

187
		public void Reply<Response>(uint rpcId, Response message) where Response : AResponse
188
		{
T
tanghai 已提交
189 190 191 192
			if (this.Id == 0)
			{
				throw new Exception("session已经被Dispose了");
			}
193
			this.SendMessage(rpcId, message, false);
194 195
		}

196
		private void SendMessage(uint rpcId, object message, bool isCall = true)
197
		{
198
			ushort opcode = Game.Scene.GetComponent<MessageDispatherComponent>().GetOpcode(message.GetType());
199
			byte[] opcodeBytes = BitConverter.GetBytes(opcode);
200
			if (!isCall)
201
			{
202
				rpcId = rpcId | 0x40000000;
203
			}
204

205 206
			byte[] messageBytes = MongoHelper.ToBson(message);
			if (messageBytes.Length > 100)
207
			{
208 209 210 211 212 213
				byte[] newMessageBytes = ZipHelper.Compress(messageBytes);
				if (newMessageBytes.Length < messageBytes.Length)
				{
					messageBytes = newMessageBytes;
					rpcId = rpcId | 0x80000000;
				}
214 215
			}

216
			byte[] seqBytes = BitConverter.GetBytes(rpcId);
217

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
			channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
		}

		public override void Dispose()
		{
			if (this.Id == 0)
			{
				return;
			}

			base.Dispose();

			channel.Dispose();
		}
	}
}