ActorProxy.cs 6.5 KB
Newer Older
1 2
using System;
using System.Collections.Generic;
3
using System.Threading;
4
using System.Threading.Tasks;
T
tanghai 已提交
5
using MongoDB.Bson.Serialization.Attributes;
6 7 8

namespace Model
{
9 10
	public abstract class ActorTask
	{
T
tanghai 已提交
11 12 13 14 15 16
		[BsonIgnore]
		public ActorProxy proxy;

		[BsonElement]
		public AMessage message;

17 18 19 20 21 22 23 24 25 26
		public abstract Task<AResponse> Run();

		public abstract void RunFail(int error);
	}

	/// <summary>
	/// 普通消息,不需要response
	/// </summary>
	public class ActorMessageTask: ActorTask
	{
27
		public ActorMessageTask(ActorProxy proxy, AMessage message)
28 29 30 31 32 33 34
		{
			this.proxy = proxy;
			this.message = message;
		}

		public override async Task<AResponse> Run()
		{
35 36
			ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
			ActorResponse response = await this.proxy.RealCall<ActorResponse>(request, this.proxy.CancellationTokenSource.Token);
37 38 39 40 41 42 43 44 45 46 47 48
			return response;
		}

		public override void RunFail(int error)
		{
		}
	}

	/// <summary>
	/// Rpc消息,需要等待返回
	/// </summary>
	/// <typeparam name="Response"></typeparam>
49
	public class ActorRpcTask<Response> : ActorTask where Response: AResponse
50
	{
T
tanghai 已提交
51
		[BsonIgnore]
52 53
		public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();

54
		public ActorRpcTask(ActorProxy proxy, ARequest message)
55 56 57 58 59 60 61
		{
			this.proxy = proxy;
			this.message = message;
		}

		public override async Task<AResponse> Run()
		{
62 63
			ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
			Response response = await this.proxy.RealCall<Response>(request, this.proxy.CancellationTokenSource.Token);
64 65 66 67 68 69 70 71 72 73 74 75 76
			if (response.Error != ErrorCode.ERR_NotFoundActor)
			{
				this.Tcs.SetResult(response);
			}
			return response;
		}

		public override void RunFail(int error)
		{
			this.Tcs.SetException(new RpcException(error, ""));
		}
	}

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91

	[ObjectEvent]
	public class ActorProxyEvent : ObjectEvent<ActorProxy>, IAwake, IStart
	{
		public void Awake()
		{
			this.Get().Awake();
		}

		public void Start()
		{
			this.Get().Start();
		}
	}

92 93
	public sealed class ActorProxy : Entity
	{
94
		// actor的地址
95
		public string Address;
96 97 98 99 100 101 102 103 104 105 106

		// 已发送等待回应的消息
		public Queue<ActorTask> RunningTasks;

		// 还没发送的消息
		public Queue<ActorTask> WaitingTasks;

		// 发送窗口大小
		public int WindowSize = 1;

		// 最大窗口
T
tanghai 已提交
107
		public const int MaxWindowSize = 1;
108 109 110 111 112 113

		private TaskCompletionSource<ActorTask> tcs;

		public CancellationTokenSource CancellationTokenSource;

		private int failTimes;
114
		
115 116 117 118 119 120 121 122 123
		public void Awake()
		{
			this.RunningTasks = new Queue<ActorTask>();
			this.WaitingTasks = new Queue<ActorTask>();
			this.WindowSize = 1;
			this.CancellationTokenSource = new CancellationTokenSource();
		}
		
		public void Start()
124
		{
125
			this.UpdateAsync();
126 127
		}

128
		private void Add(ActorTask task)
129
		{
130 131
			this.WaitingTasks.Enqueue(task);
			this.AllowGet();
132 133
		}

134
		private void Remove()
135
		{
T
tanghai 已提交
136
			ActorTask task = this.RunningTasks.Dequeue();
137 138 139 140 141 142
			this.AllowGet();
		}

		private void AllowGet()
		{
			if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
143
			{
144
				return;
145
			}
146

T
tanghai 已提交
147

148 149
			ActorTask task = this.WaitingTasks.Dequeue();
			this.RunningTasks.Enqueue(task);
T
tanghai 已提交
150 151 152

			var t = this.tcs;
			this.tcs = null;
153 154 155 156 157 158
			t.SetResult(task);
		}

		private Task<ActorTask> GetAsync()
		{
			if (this.WaitingTasks.Count > 0)
159
			{
160 161 162
				ActorTask task = this.WaitingTasks.Dequeue();
				this.RunningTasks.Enqueue(task);
				return Task.FromResult(task);
163
			}
T
tanghai 已提交
164
			
165 166
			this.tcs = new TaskCompletionSource<ActorTask>();
			return this.tcs.Task;
167 168
		}

169
		private async void UpdateAsync()
170
		{
171 172 173 174 175
			if (this.Address == null)
			{
				int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
				this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
			}
176
			while (true)
177
			{
178 179
				ActorTask actorTask = await this.GetAsync();
				this.RunTask(actorTask);
180
			}
181
		}
182

183 184
		private async void RunTask(ActorTask task)
		{
185
			try
186
			{
187
				AResponse response = await task.Run();
188

189 190
				// 如果没找到Actor,发送窗口减少为1,重试
				if (response.Error == ErrorCode.ERR_NotFoundActor)
191
				{
192 193 194
					this.CancellationTokenSource.Cancel();
					this.WindowSize = 1;
					++this.failTimes;
195 196 197 198

					while (this.WaitingTasks.Count > 0)
					{
						ActorTask actorTask = this.WaitingTasks.Dequeue();
199 200 201 202 203 204 205 206 207 208 209 210 211
						this.RunningTasks.Enqueue(actorTask);
					}
					ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);

					// 失败3次则清空actor发送队列,返回失败
					if (this.failTimes > 3)
					{
						while (this.WaitingTasks.Count > 0)
						{
							ActorTask actorTask = this.WaitingTasks.Dequeue();
							actorTask.RunFail(response.Error);
						}
						return;
212
					}
213 214 215 216 217 218 219

					// 等待一会再发送
					await this.Parent.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
					int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
					this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
					this.CancellationTokenSource = new CancellationTokenSource();
					this.AllowGet();
220 221 222
					return;
				}

223 224 225 226 227 228 229
				// 发送成功
				this.failTimes = 0;
				if (this.WindowSize < MaxWindowSize)
				{
					++this.WindowSize;
				}
				this.Remove();
230
			}
231
			catch (Exception e)
232
			{
233
				Log.Error(e.ToString());
234
			}
235
		}
236

237
		public void Send(AMessage message)
238 239 240 241 242
		{
			ActorMessageTask task = new ActorMessageTask(this, message);
			this.Add(task);
		}

243
		public Task<Response> Call<Response>(ARequest request)where Response : AResponse
244 245 246 247 248 249
		{
			ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
			this.Add(task);
			return task.Tcs.Task;
		}

250
		public async Task<Response> RealCall<Response>(ActorRequest request, CancellationToken cancellationToken) where Response: AResponse
251 252 253
		{
			try
			{
T
tanghai 已提交
254
				//Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
255
				request.Id = this.Id;
256 257 258 259 260
				Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
				Response response = await session.Call<Response>(request, cancellationToken);
				return response;
			}
			catch (RpcException e)
261
			{
262
				Log.Error($"{this.Address} {e}");
263
				throw;
264
			}
265 266
		}

T
tanghai 已提交
267 268 269 270 271 272 273 274 275 276
		public string DebugQueue(Queue<ActorTask> tasks)
		{
			string s = "";
			foreach (ActorTask task in tasks)
			{
				s += $" {task.message.GetType().Name}";
			}
			return s;
		}

277 278 279 280 281 282 283 284 285 286 287
		public override void Dispose()
		{
			if (this.Id == 0)
			{
				return;
			}

			base.Dispose();
		}
	}
}