提交 d6a2d5ea 编写于 作者: T tanghai

ActorTask改成struct,减少堆内存分配

上级 93b10eca
...@@ -3,78 +3,9 @@ using System.Collections.Generic; ...@@ -3,78 +3,9 @@ using System.Collections.Generic;
using System.Net; using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MongoDB.Bson.Serialization.Attributes;
namespace Model namespace Model
{ {
public abstract class ActorTask
{
[BsonIgnore]
public ActorProxy proxy;
[BsonElement]
public MessageObject message;
public abstract Task<IResponse> Run();
public abstract void RunFail(int error);
}
/// <summary>
/// 普通消息,不需要response
/// </summary>
public class ActorMessageTask: ActorTask
{
public ActorMessageTask(ActorProxy proxy, IMessage message)
{
this.proxy = proxy;
this.message = (MessageObject)message;
}
public override async Task<IResponse> Run()
{
ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token);
return response;
}
public override void RunFail(int error)
{
}
}
/// <summary>
/// Rpc消息,需要等待返回
/// </summary>
public class ActorRpcTask : ActorTask
{
[BsonIgnore]
public readonly TaskCompletionSource<IResponse> Tcs = new TaskCompletionSource<IResponse>();
public ActorRpcTask(ActorProxy proxy, IMessage message)
{
this.proxy = proxy;
this.message = (MessageObject)message;
}
public override async Task<IResponse> Run()
{
ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token);
if (response.Error != ErrorCode.ERR_NotFoundActor)
{
this.Tcs.SetResult((IResponse)response.AMessage);
}
return response;
}
public override void RunFail(int error)
{
this.Tcs.SetException(new RpcException(error, ""));
}
}
[ObjectSystem] [ObjectSystem]
public class ActorProxySystem : ObjectSystem<ActorProxy>, IAwake, IStart public class ActorProxySystem : ObjectSystem<ActorProxy>, IAwake, IStart
{ {
...@@ -118,8 +49,6 @@ namespace Model ...@@ -118,8 +49,6 @@ namespace Model
public void Awake() public void Awake()
{ {
this.LastSendTime = TimeHelper.Now(); this.LastSendTime = TimeHelper.Now();
this.RunningTasks.Clear();
this.WaitingTasks.Clear();
this.WindowSize = 1; this.WindowSize = 1;
this.tcs = null; this.tcs = null;
this.CancellationTokenSource = new CancellationTokenSource(); this.CancellationTokenSource = new CancellationTokenSource();
...@@ -140,7 +69,7 @@ namespace Model ...@@ -140,7 +69,7 @@ namespace Model
this.failTimes = 0; this.failTimes = 0;
var t = this.tcs; var t = this.tcs;
this.tcs = null; this.tcs = null;
t?.SetResult(null); t?.SetResult(new ActorTask());
} }
public async void Start() public async void Start()
...@@ -164,13 +93,7 @@ namespace Model ...@@ -164,13 +93,7 @@ namespace Model
this.AllowGet(); this.AllowGet();
} }
} }
private void Remove()
{
this.RunningTasks.Dequeue();
this.AllowGet();
}
private void AllowGet() private void AllowGet()
{ {
if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize) if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
...@@ -208,10 +131,6 @@ namespace Model ...@@ -208,10 +131,6 @@ namespace Model
{ {
return; return;
} }
if (actorTask == null)
{
return;
}
try try
{ {
this.RunTask(actorTask); this.RunTask(actorTask);
...@@ -273,7 +192,9 @@ namespace Model ...@@ -273,7 +192,9 @@ namespace Model
{ {
++this.WindowSize; ++this.WindowSize;
} }
this.Remove();
this.RunningTasks.Dequeue();
this.AllowGet();
} }
catch (Exception e) catch (Exception e)
{ {
...@@ -283,13 +204,18 @@ namespace Model ...@@ -283,13 +204,18 @@ namespace Model
public void Send(IMessage message) public void Send(IMessage message)
{ {
ActorMessageTask task = new ActorMessageTask(this, message); ActorTask task = new ActorTask();
task.message = (MessageObject)message;
task.proxy = this;
this.Add(task); this.Add(task);
} }
public Task<IResponse> Call(IRequest request) public Task<IResponse> Call(IRequest request)
{ {
ActorRpcTask task = new ActorRpcTask(this, request); ActorTask task = new ActorTask();
task.message = (MessageObject)request;
task.proxy = this;
task.Tcs = new TaskCompletionSource<IResponse>();
this.Add(task); this.Add(task);
return task.Tcs.Task; return task.Tcs.Task;
} }
......
using System.Threading.Tasks;
namespace Model
{
public struct ActorTask
{
public ActorProxy proxy;
public MessageObject message;
public TaskCompletionSource<IResponse> Tcs;
public async Task<IResponse> Run()
{
ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token);
if (response.Error != ErrorCode.ERR_NotFoundActor)
{
this.Tcs?.SetResult((IResponse)response.AMessage);
}
return response;
}
public void RunFail(int error)
{
this.Tcs?.SetException(new RpcException(error, ""));
}
}
}
\ No newline at end of file
...@@ -4,15 +4,6 @@ namespace Model ...@@ -4,15 +4,6 @@ namespace Model
{ {
public abstract class DBTask : Component public abstract class DBTask : Component
{ {
protected DBTask()
{
}
protected DBTask(long id)
{
this.Id = id;
}
public abstract Task Run(); public abstract Task Run();
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册