提交 7fae93d3 编写于 作者: T tanghai

增加协程锁组件,locationComponent跟actor都使用协程锁来实现队列机制,代码大大简化,并且非常好懂。

协程锁原理很简单,同一个key只有一个协程能执行,其它同一个key的协程将队列,这个协程执行完会唤醒下一个协程。
协程锁是个非常方便的组件,比如服务端在处理登录或者下线过程中,每个异步操作都可能同一个账号会再次登录上来,
逻辑十分复杂,我们会希望登录某部分异步操作是原子操作,账号再次登录要等这个原子操作完成才能执行,
这样登录或者下线过程逻辑复杂度将会简化十倍以上。协程锁是ET解决这类异步重入问题的大杀器。
上级 6e0df01c
......@@ -61,7 +61,7 @@
<rules>
<!--<logger name="*" minlevel="Trace" writeTo="all" />-->
<logger name="*" minlevel="Trace" maxlevel="Trace" writeTo="trace" />
<logger name="*" minlevel="Debug" maxlevel="Debug" writeTo="debug" />
<logger name="*" minlevel="Trace" maxlevel="Fatal" writeTo="debug" />
<logger name="*" minlevel="Info" maxlevel="Info" writeTo="info" />
<logger name="*" minlevel="Warn" maxlevel="Warn" writeTo="warn" />
<logger name="*" minlevel="Error" maxlevel="Error" writeTo="error" />
......
......@@ -69,10 +69,12 @@ namespace App
Game.Scene.AddComponent<ActorMessageSenderComponent>();
Game.Scene.AddComponent<ActorLocationSenderComponent>();
Game.Scene.AddComponent<GateSessionKeyComponent>();
Game.Scene.AddComponent<CoroutineLockComponent>();
break;
case AppType.Location:
Game.Scene.AddComponent<NetInnerComponent, string>(innerConfig.Address);
Game.Scene.AddComponent<LocationComponent>();
Game.Scene.AddComponent<CoroutineLockComponent>();
break;
case AppType.Map:
Game.Scene.AddComponent<NetInnerComponent, string>(innerConfig.Address);
......@@ -83,6 +85,7 @@ namespace App
Game.Scene.AddComponent<MailboxDispatcherComponent>();
Game.Scene.AddComponent<ActorMessageDispatcherComponent>();
Game.Scene.AddComponent<PathfindingComponent>();
Game.Scene.AddComponent<CoroutineLockComponent>();
break;
case AppType.AllServer:
// 发送普通actor消息
......@@ -125,6 +128,8 @@ namespace App
Game.Scene.AddComponent<UnitComponent>();
Game.Scene.AddComponent<ConsoleComponent>();
Game.Scene.AddComponent<CoroutineLockComponent>();
// Game.Scene.AddComponent<HttpComponent>();
break;
case AppType.Benchmark:
......
......@@ -9,7 +9,6 @@ namespace ETHotfix
public override void Awake(MailBoxComponent self)
{
self.MailboxType = MailboxType.MessageDispatcher;
self.Queue.Clear();
}
}
......@@ -19,16 +18,6 @@ namespace ETHotfix
public override void Awake(MailBoxComponent self, string mailboxType)
{
self.MailboxType = mailboxType;
self.Queue.Clear();
}
}
[ObjectSystem]
public class MailBoxComponentStartSystem : StartSystem<MailBoxComponent>
{
public override void Start(MailBoxComponent self)
{
self.HandleAsync().Coroutine();
}
}
......@@ -47,60 +36,10 @@ namespace ETHotfix
await Game.Scene.GetComponent<LocationProxyComponent>().Remove(self.Entity.Id);
}
public static void Add(this MailBoxComponent self, ActorMessageInfo info)
{
self.Queue.Enqueue(info);
if (self.Tcs == null)
{
return;
}
var t = self.Tcs;
self.Tcs = null;
t.SetResult(self.Queue.Dequeue());
}
private static ETTask<ActorMessageInfo> GetAsync(this MailBoxComponent self)
{
if (self.Queue.Count > 0)
{
return ETTask.FromResult(self.Queue.Dequeue());
}
self.Tcs = new ETTaskCompletionSource<ActorMessageInfo>();
return self.Tcs.Task;
}
public static async ETVoid HandleAsync(this MailBoxComponent self)
public static async ETTask Add(this MailBoxComponent self, Session session, object message)
{
MailboxDispatcherComponent mailboxDispatcherComponent = Game.Scene.GetComponent<MailboxDispatcherComponent>();
long instanceId = self.InstanceId;
while (true)
{
if (self.InstanceId != instanceId)
{
return;
}
try
{
ActorMessageInfo info = await self.GetAsync();
// 返回null表示actor已经删除,协程要终止
if (info.Message == null)
{
return;
}
// 根据这个mailbox类型分发给相应的处理
await mailboxDispatcherComponent.Handle(self, info);
}
catch (Exception e)
{
Log.Error(e);
}
}
await mailboxDispatcherComponent.Handle(self, session, message);
}
}
}
\ No newline at end of file
......@@ -67,12 +67,12 @@ namespace ETHotfix
/// 根据mailbox类型做不同的处理
/// </summary>
public static async ETTask Handle(
this MailboxDispatcherComponent self, MailBoxComponent mailBoxComponent, ActorMessageInfo actorMessageInfo)
this MailboxDispatcherComponent self, MailBoxComponent mailBoxComponent, Session session, object message)
{
IMailboxHandler iMailboxHandler;
if (self.MailboxHandlers.TryGetValue(mailBoxComponent.MailboxType, out iMailboxHandler))
{
await iMailboxHandler.Handle(actorMessageInfo.Session, mailBoxComponent.Entity, actorMessageInfo.Message);
await iMailboxHandler.Handle(session, mailBoxComponent.Entity, message);
}
}
}
......
......@@ -8,7 +8,7 @@ namespace ETHotfix
{
protected override async ETTask Run(Session session, ObjectAddRequest request, ObjectAddResponse response, Action reply)
{
Game.Scene.GetComponent<LocationComponent>().Add(request.Key, request.InstanceId);
await Game.Scene.GetComponent<LocationComponent>().Add(request.Key, request.InstanceId);
reply();
}
}
......
......@@ -8,7 +8,7 @@ namespace ETHotfix
{
protected override async ETTask Run(Session session, ObjectGetRequest request, ObjectGetResponse response, Action reply)
{
long instanceId = await Game.Scene.GetComponent<LocationComponent>().GetAsync(request.Key);
long instanceId = await Game.Scene.GetComponent<LocationComponent>().Get(request.Key);
if (instanceId == 0)
{
response.Error = ErrorCode.ERR_ActorLocationNotFound;
......
......@@ -8,7 +8,7 @@ namespace ETHotfix
{
protected override async ETTask Run(Session session, ObjectRemoveRequest request, ObjectRemoveResponse response, Action reply)
{
Game.Scene.GetComponent<LocationComponent>().Remove(request.Key);
await Game.Scene.GetComponent<LocationComponent>().Remove(request.Key);
reply();
await ETTask.CompletedTask;
}
......
......@@ -8,7 +8,7 @@ namespace ETHotfix
{
protected override async ETTask Run(Session session, ObjectUnLockRequest request, ObjectUnLockResponse response, Action reply)
{
Game.Scene.GetComponent<LocationComponent>().UnLockAndUpdate(request.Key, request.OldInstanceId, request.InstanceId);
Game.Scene.GetComponent<LocationComponent>().UnLock(request.Key, request.OldInstanceId, request.InstanceId);
reply();
await ETTask.CompletedTask;
}
......
......@@ -11,59 +11,74 @@ namespace ETHotfix
{
case IActorRequest iActorRequest:
{
Entity entity = (Entity)Game.EventSystem.Get(iActorRequest.ActorId);
if (entity == null)
{
Log.Warning($"not found actor: {message}");
ActorResponse response = new ActorResponse
{
Error = ErrorCode.ERR_NotFoundActor,
RpcId = iActorRequest.RpcId
};
session.Reply(response);
return;
}
MailBoxComponent mailBoxComponent = entity.GetComponent<MailBoxComponent>();
if (mailBoxComponent == null)
{
ActorResponse response = new ActorResponse
{
Error = ErrorCode.ERR_ActorNoMailBoxComponent,
RpcId = iActorRequest.RpcId
};
session.Reply(response);
Log.Error($"actor not add MailBoxComponent: {entity.GetType().Name} {message}");
return;
}
mailBoxComponent.Add(new ActorMessageInfo() { Session = session, Message = iActorRequest });
return;
HandleIActorRequest(session, iActorRequest).Coroutine();
break;
}
case IActorMessage iactorMessage:
{
Entity entity = (Entity)Game.EventSystem.Get(iactorMessage.ActorId);
if (entity == null)
HandleIActorMessage(session, iactorMessage).Coroutine();
break;
}
default:
{
Game.Scene.GetComponent<MessageDispatcherComponent>().Handle(session, new MessageInfo(opcode, message));
break;
}
}
}
private async ETVoid HandleIActorRequest(Session session, IActorRequest message)
{
using (await CoroutineLockComponent.Instance.Wait(message.ActorId))
{
Entity entity = (Entity)Game.EventSystem.Get(message.ActorId);
if (entity == null)
{
Log.Warning($"not found actor: {message}");
ActorResponse response = new ActorResponse
{
Log.Error($"not found actor: {message}");
return;
}
Error = ErrorCode.ERR_NotFoundActor,
RpcId = message.RpcId
};
session.Reply(response);
return;
}
MailBoxComponent mailBoxComponent = entity.GetComponent<MailBoxComponent>();
if (mailBoxComponent == null)
MailBoxComponent mailBoxComponent = entity.GetComponent<MailBoxComponent>();
if (mailBoxComponent == null)
{
ActorResponse response = new ActorResponse
{
Log.Error($"actor not add MailBoxComponent: {entity.GetType().Name} {message}");
return;
}
Error = ErrorCode.ERR_ActorNoMailBoxComponent,
RpcId = message.RpcId
};
session.Reply(response);
Log.Error($"actor not add MailBoxComponent: {entity.GetType().Name} {message}");
return;
}
mailBoxComponent.Add(new ActorMessageInfo() { Session = session, Message = iactorMessage });
await mailBoxComponent.Add(session, message);
}
}
private async ETVoid HandleIActorMessage(Session session, IActorMessage message)
{
using (await CoroutineLockComponent.Instance.Wait(message.ActorId))
{
Entity entity = (Entity)Game.EventSystem.Get(message.ActorId);
if (entity == null)
{
Log.Error($"not found actor: {message}");
return;
}
default:
MailBoxComponent mailBoxComponent = entity.GetComponent<MailBoxComponent>();
if (mailBoxComponent == null)
{
Game.Scene.GetComponent<MessageDispatcherComponent>().Handle(session, new MessageInfo(opcode, message));
break;
Log.Error($"actor not add MailBoxComponent: {entity.GetType().Name} {message}");
return;
}
await mailBoxComponent.Add(session, message);
}
}
}
......
......@@ -16,11 +16,6 @@ namespace ETModel
// Mailbox的类型
public string MailboxType;
// 队列处理消息
public Queue<ActorMessageInfo> Queue = new Queue<ActorMessageInfo>();
public ETTaskCompletionSource<ActorMessageInfo> Tcs;
public override void Dispose()
{
if (this.IsDisposed)
......@@ -29,10 +24,6 @@ namespace ETModel
}
base.Dispose();
var t = this.Tcs;
this.Tcs = null;
t?.SetResult(new ActorMessageInfo());
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Collections.Generic;
namespace ETModel
{
public abstract class LocationTask: Component
{
public abstract void Run();
}
[ObjectSystem]
public class LocationQueryTaskAwakeSystem : AwakeSystem<LocationQueryTask, long>
public class LockInfoAwakeSystem : AwakeSystem<LockInfo, long, CoroutineLock>
{
public override void Awake(LocationQueryTask self, long key)
public override void Awake(LockInfo self, long lockInstanceId, CoroutineLock coroutineLock)
{
self.Key = key;
self.Tcs = new ETTaskCompletionSource<long>();
self.LockInstanceId = lockInstanceId;
self.CoroutineLock = coroutineLock;
}
}
public sealed class LocationQueryTask : LocationTask
public class LockInfo: Component
{
public long Key;
public ETTaskCompletionSource<long> Tcs;
public ETTask<long> Task
{
get
{
return this.Tcs.Task;
}
}
public long LockInstanceId;
public CoroutineLock CoroutineLock;
public override void Run()
public override void Dispose()
{
try
{
LocationComponent locationComponent = this.GetParent<LocationComponent>();
long location = locationComponent.Get(this.Key);
this.Tcs.SetResult(location);
}
catch (Exception e)
{
this.Tcs.SetException(e);
}
this.LockInstanceId = 0;
this.CoroutineLock.Dispose();
}
}
public class LocationComponent : Component
{
private readonly Dictionary<long, long> locations = new Dictionary<long, long>();
private readonly Dictionary<long, long> lockDict = new Dictionary<long, long>();
private readonly Dictionary<long, Queue<LocationTask>> taskQueues = new Dictionary<long, Queue<LocationTask>>();
public void Add(long key, long instanceId)
private readonly Dictionary<long, LockInfo> lockInfos = new Dictionary<long, LockInfo>();
public override void Dispose()
{
this.locations[key] = instanceId;
Log.Info($"location add key: {key} instanceId: {instanceId}");
if (this.IsDisposed)
{
return;
}
base.Dispose();
this.locations.Clear();
// 更新db
//await Game.Scene.GetComponent<DBProxyComponent>().Save(new Location(key, address));
foreach (LockInfo lockInfo in this.lockInfos.Values)
{
lockInfo.Dispose();
}
this.lockInfos.Clear();
}
public void Remove(long key)
public async ETTask Add(long key, long instanceId)
{
Log.Info($"location remove key: {key}");
this.locations.Remove(key);
using (await CoroutineLockComponent.Instance.Wait(key + (int)AppType.Location))
{
this.locations[key] = instanceId;
Log.Info($"location add key: {key} instanceId: {instanceId}");
}
}
public long Get(long key)
public async ETTask Remove(long key)
{
this.locations.TryGetValue(key, out long instanceId);
return instanceId;
using (await CoroutineLockComponent.Instance.Wait(key + (int)AppType.Location))
{
this.locations.Remove(key);
Log.Info($"location remove key: {key}");
}
}
public async ETVoid Lock(long key, long instanceId, int time = 0)
public async ETTask<long> Get(long key)
{
if (this.lockDict.ContainsKey(key))
using (await CoroutineLockComponent.Instance.Wait(key + (int)AppType.Location))
{
Log.Error($"不可能同时存在两次lock, key: {key} InstanceId: {instanceId}");
return;
this.locations.TryGetValue(key, out long instanceId);
Log.Info($"location get key: {key} {instanceId}");
return instanceId;
}
Log.Info($"location lock key: {key} InstanceId: {instanceId}");
}
public async ETVoid Lock(long key, long instanceId, int time = 0)
{
if (!this.locations.TryGetValue(key, out long saveInstanceId))
{
Log.Error($"actor没有注册, key: {key} InstanceId: {instanceId}");
return;
}
if (saveInstanceId != instanceId)
{
Log.Error($"actor注册的instanceId与lock的不一致, key: {key} InstanceId: {instanceId} saveInstanceId: {saveInstanceId}");
return;
}
CoroutineLock coroutineLock = await CoroutineLockComponent.Instance.Wait(key + (int)AppType.Location);
this.lockDict.Add(key, instanceId);
LockInfo lockInfo = ComponentFactory.Create<LockInfo, long, CoroutineLock>(instanceId, coroutineLock);
this.lockInfos.Add(key, lockInfo);
Log.Info($"location lock key: {key} InstanceId: {instanceId}");
// 超时则解锁
if (time > 0)
{
await Game.Scene.GetComponent<TimerComponent>().WaitAsync(time);
if (!this.lockDict.ContainsKey(key))
{
return;
}
Log.Info($"location timeout unlock key: {key} time: {time}");
this.UnLock(key);
this.UnLock(key, instanceId, instanceId);
}
}
public void UnLockAndUpdate(long key, long oldInstanceId, long instanceId)
public void UnLock(long key, long oldInstanceId, long newInstanceId)
{
this.lockDict.TryGetValue(key, out long lockInstanceId);
if (lockInstanceId != oldInstanceId)
{
Log.Error($"unlock appid is different {lockInstanceId} {oldInstanceId}" );
}
Log.Info($"location unlock key: {key} oldInstanceId: {oldInstanceId} new: {instanceId}");
this.locations[key] = instanceId;
this.UnLock(key);
}
private void UnLock(long key)
{
this.lockDict.Remove(key);
if (!this.taskQueues.TryGetValue(key, out Queue<LocationTask> tasks))
if (!this.lockInfos.TryGetValue(key, out LockInfo lockInfo))
{
return;
}
while (true)
{
if (tasks.Count <= 0)
{
this.taskQueues.Remove(key);
return;
}
if (this.lockDict.ContainsKey(key))
{
return;
}
LocationTask task = tasks.Dequeue();
try
{
task.Run();
}
catch (Exception e)
{
Log.Error(e);
}
task.Dispose();
}
}
public ETTask<long> GetAsync(long key)
{
if (!this.lockDict.ContainsKey(key))
{
this.locations.TryGetValue(key, out long instanceId);
Log.Info($"location get key: {key} {instanceId}");
return ETTask.FromResult(instanceId);
}
LocationQueryTask task = ComponentFactory.CreateWithParent<LocationQueryTask, long>(this, key);
this.AddTask(key, task);
return task.Task;
}
public void AddTask(long key, LocationTask task)
{
if (!this.taskQueues.TryGetValue(key, out Queue<LocationTask> tasks))
{
tasks = new Queue<LocationTask>();
this.taskQueues[key] = tasks;
}
tasks.Enqueue(task);
}
public override void Dispose()
{
if (this.IsDisposed)
if (lockInfo.LockInstanceId != oldInstanceId)
{
Log.Error($"unlock appid is different {lockInfo.LockInstanceId} {oldInstanceId}" );
return;
}
base.Dispose();
Log.Info($"location unlock key: {key} oldInstanceId: {oldInstanceId} new: {newInstanceId}");
this.locations.Clear();
this.lockDict.Clear();
this.taskQueues.Clear();
this.locations[key] = newInstanceId;
lockInfo.Dispose();
}
}
}
\ No newline at end of file
......@@ -47,6 +47,9 @@
<Compile Include="..\..\Unity\Assets\Model\Base\Async\MoveNextRunner.cs">
<Link>Base\Async\MoveNextRunner.cs</Link>
</Compile>
<Compile Include="..\..\Unity\Assets\Model\Base\CoroutineLockComponent.cs">
<Link>Base\CoroutineLockComponent.cs</Link>
</Compile>
<Compile Include="..\..\Unity\Assets\Model\Base\Helper\MongoHelper.cs">
<Link>Base\Helper\MongoHelper.cs</Link>
</Compile>
......
using System.Collections.Generic;
namespace ETModel
{
[ObjectSystem]
public class CoroutineLockAwakeSystem: AwakeSystem<CoroutineLock, long>
{
public override void Awake(CoroutineLock self, long key)
{
self.Awake(key);
}
}
public class CoroutineLock: Component
{
private long key { get; set; }
public void Awake(long key)
{
this.key = key;
}
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
base.Dispose();
CoroutineLockComponent.Instance.Notify(this.key);
}
}
public class LockQueue: Component
{
private readonly Queue<ETTaskCompletionSource<CoroutineLock>> queue = new Queue<ETTaskCompletionSource<CoroutineLock>>();
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
base.Dispose();
this.queue.Clear();
}
public void Enqueue(ETTaskCompletionSource<CoroutineLock> tcs)
{
this.queue.Enqueue(tcs);
}
public ETTaskCompletionSource<CoroutineLock> Dequeue()
{
return this.queue.Dequeue();
}
public int Count
{
get
{
return this.queue.Count;
}
}
}
[ObjectSystem]
public class CoroutineLockComponentAwakeSystem: AwakeSystem<CoroutineLockComponent>
{
public override void Awake(CoroutineLockComponent self)
{
CoroutineLockComponent.Instance = self;
}
}
public class CoroutineLockComponent: Component
{
public static CoroutineLockComponent Instance { get; set; }
private readonly Dictionary<long, LockQueue> lockQueues = new Dictionary<long, LockQueue>();
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
base.Dispose();
foreach (var kv in this.lockQueues)
{
kv.Value.Dispose();
}
this.lockQueues.Clear();
Instance = null;
}
public async ETTask<CoroutineLock> Wait(long key)
{
if (!this.lockQueues.TryGetValue(key, out LockQueue lockQueue))
{
this.lockQueues.Add(key, ComponentFactory.Create<LockQueue>());
return ComponentFactory.Create<CoroutineLock, long>(key);
}
ETTaskCompletionSource<CoroutineLock> tcs = new ETTaskCompletionSource<CoroutineLock>();
lockQueue.Enqueue(tcs);
return await tcs.Task;
}
public void Notify(long key)
{
if (!this.lockQueues.TryGetValue(key, out LockQueue lockQueue))
{
Log.Error($"CoroutineLockComponent Notify not found queue: {key}");
return;
}
if (lockQueue.Count == 0)
{
this.lockQueues.Remove(key);
lockQueue.Dispose();
return;
}
ETTaskCompletionSource<CoroutineLock> tcs = lockQueue.Dequeue();
tcs.SetResult(ComponentFactory.Create<CoroutineLock, long>(key));
}
}
}
\ No newline at end of file
fileFormatVersion: 2
guid: 0ca564c89e8bb4d61b3720d67fc2edaf
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
......@@ -70,6 +70,7 @@
<Compile Include="Assets\Model\Base\Async\ETVoid.cs" />
<Compile Include="Assets\Model\Base\Async\IAwaiter.cs" />
<Compile Include="Assets\Model\Base\Async\MoveNextRunner.cs" />
<Compile Include="Assets\Model\Base\CoroutineLockComponent.cs" />
<Compile Include="Assets\Model\Base\DoubleMap.cs" />
<Compile Include="Assets\Model\Base\Event\Env.cs" />
<Compile Include="Assets\Model\Base\Event\EnvKey.cs" />
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册