From d6a2d5ea6038531ad5ead68923c639bcdfa51e40 Mon Sep 17 00:00:00 2001 From: tanghai Date: Mon, 12 Feb 2018 13:10:27 +0800 Subject: [PATCH] =?UTF-8?q?ActorTask=E6=94=B9=E6=88=90struct,=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=A0=86=E5=86=85=E5=AD=98=E5=88=86=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Server/Model/Module/Actor/ActorProxy.cs | 98 +++---------------------- Server/Model/Module/Actor/ActorTask.cs | 29 ++++++++ Server/Model/Module/DB/DBTask.cs | 9 --- 3 files changed, 41 insertions(+), 95 deletions(-) create mode 100644 Server/Model/Module/Actor/ActorTask.cs diff --git a/Server/Model/Module/Actor/ActorProxy.cs b/Server/Model/Module/Actor/ActorProxy.cs index 3a74ce52..19a86776 100644 --- a/Server/Model/Module/Actor/ActorProxy.cs +++ b/Server/Model/Module/Actor/ActorProxy.cs @@ -3,78 +3,9 @@ using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; -using MongoDB.Bson.Serialization.Attributes; namespace Model { - public abstract class ActorTask - { - [BsonIgnore] - public ActorProxy proxy; - - [BsonElement] - public MessageObject message; - - public abstract Task Run(); - - public abstract void RunFail(int error); - } - - /// - /// 普通消息,不需要response - /// - public class ActorMessageTask: ActorTask - { - public ActorMessageTask(ActorProxy proxy, IMessage message) - { - this.proxy = proxy; - this.message = (MessageObject)message; - } - - public override async Task Run() - { - ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message }; - ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token); - return response; - } - - public override void RunFail(int error) - { - } - } - - /// - /// Rpc消息,需要等待返回 - /// - public class ActorRpcTask : ActorTask - { - [BsonIgnore] - public readonly TaskCompletionSource Tcs = new TaskCompletionSource(); - - public ActorRpcTask(ActorProxy proxy, IMessage message) - { - this.proxy = proxy; - this.message = (MessageObject)message; - } - - public override async Task Run() - { - ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message }; - ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token); - if (response.Error != ErrorCode.ERR_NotFoundActor) - { - this.Tcs.SetResult((IResponse)response.AMessage); - } - return response; - } - - public override void RunFail(int error) - { - this.Tcs.SetException(new RpcException(error, "")); - } - } - - [ObjectSystem] public class ActorProxySystem : ObjectSystem, IAwake, IStart { @@ -118,8 +49,6 @@ namespace Model public void Awake() { this.LastSendTime = TimeHelper.Now(); - this.RunningTasks.Clear(); - this.WaitingTasks.Clear(); this.WindowSize = 1; this.tcs = null; this.CancellationTokenSource = new CancellationTokenSource(); @@ -140,7 +69,7 @@ namespace Model this.failTimes = 0; var t = this.tcs; this.tcs = null; - t?.SetResult(null); + t?.SetResult(new ActorTask()); } public async void Start() @@ -164,13 +93,7 @@ namespace Model this.AllowGet(); } } - - private void Remove() - { - this.RunningTasks.Dequeue(); - this.AllowGet(); - } - + private void AllowGet() { if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize) @@ -208,10 +131,6 @@ namespace Model { return; } - if (actorTask == null) - { - return; - } try { this.RunTask(actorTask); @@ -273,7 +192,9 @@ namespace Model { ++this.WindowSize; } - this.Remove(); + + this.RunningTasks.Dequeue(); + this.AllowGet(); } catch (Exception e) { @@ -283,13 +204,18 @@ namespace Model public void Send(IMessage message) { - ActorMessageTask task = new ActorMessageTask(this, message); + ActorTask task = new ActorTask(); + task.message = (MessageObject)message; + task.proxy = this; this.Add(task); } public Task Call(IRequest request) { - ActorRpcTask task = new ActorRpcTask(this, request); + ActorTask task = new ActorTask(); + task.message = (MessageObject)request; + task.proxy = this; + task.Tcs = new TaskCompletionSource(); this.Add(task); return task.Tcs.Task; } diff --git a/Server/Model/Module/Actor/ActorTask.cs b/Server/Model/Module/Actor/ActorTask.cs new file mode 100644 index 00000000..5c1a5fe1 --- /dev/null +++ b/Server/Model/Module/Actor/ActorTask.cs @@ -0,0 +1,29 @@ +using System.Threading.Tasks; + +namespace Model +{ + public struct ActorTask + { + public ActorProxy proxy; + + public MessageObject message; + + public TaskCompletionSource Tcs; + + public async Task Run() + { + ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message }; + ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token); + if (response.Error != ErrorCode.ERR_NotFoundActor) + { + this.Tcs?.SetResult((IResponse)response.AMessage); + } + return response; + } + + public void RunFail(int error) + { + this.Tcs?.SetException(new RpcException(error, "")); + } + } +} \ No newline at end of file diff --git a/Server/Model/Module/DB/DBTask.cs b/Server/Model/Module/DB/DBTask.cs index f6186a96..3a6daa2a 100644 --- a/Server/Model/Module/DB/DBTask.cs +++ b/Server/Model/Module/DB/DBTask.cs @@ -4,15 +4,6 @@ namespace Model { public abstract class DBTask : Component { - protected DBTask() - { - } - - protected DBTask(long id) - { - this.Id = id; - } - public abstract Task Run(); } } \ No newline at end of file -- GitLab