提交 601cdbe7 编写于 作者: T tanghai

1.Unit增加ActorComponent,用于处理Unit消息,Unit处理一个消息返回后接着处理下一个消息,相当于一个协程

2.增加异步事件分发机制
上级 574fc958
......@@ -4,8 +4,8 @@ using MongoDB.Bson;
namespace Controller
{
[Callback(CallbackType.BuffTimeoutCallback)]
public class BuffTimeoutCallback: IEvent
[Action(ActionType.BuffTimeoutAction)]
public class BuffTimeoutAction: IEventSync
{
public void Run(Env env)
{
......
using Common.Event;
using Model;
using MongoDB.Bson;
namespace Controller
{
[Action(ActionType.MessageAction)]
public class MessageAction : IEventSync
{
public void Run(Env env)
{
Unit unit = World.Instance.GetComponent<UnitComponent>().Get(ObjectId.Empty);
if (unit == null)
{
return;
}
unit.GetComponent<ActorComponent>().Add(env);
}
}
}
\ No newline at end of file
......@@ -48,12 +48,14 @@
<Compile Include="BehaviorTreeNode\Not.cs" />
<Compile Include="BehaviorTreeNode\Selector.cs" />
<Compile Include="BehaviorTreeNode\Sequence.cs" />
<Compile Include="Action\MessageAction.cs" />
<Compile Include="ConfigCategory\BuffCategory.cs" />
<Compile Include="ConfigCategory\GlobalCategory.cs" />
<Compile Include="ConfigCategory\NodeCategory.cs" />
<Compile Include="ConfigCategory\UnitCategory.cs" />
<Compile Include="Callback\BuffTimeoutCallback.cs" />
<Compile Include="Action\BuffTimeoutAction.cs" />
<Compile Include="Event\AfterAddBuff.cs" />
<Compile Include="Message\CMsgLogin.cs" />
<Compile Include="NodeType.cs" />
<Compile Include="Factory\UnitFactory.cs" />
<Compile Include="UnitType.cs" />
......
......@@ -4,7 +4,7 @@ using Model;
namespace Controller
{
[Event(EventType.AfterAddBuff)]
public class AddBuffToTimer: IEvent
public class AddBuffToTimer: IEventSync
{
public void Run(Env env)
{
......
using System;
using System.Threading.Tasks;
using Common.Event;
using Model;
namespace Controller.Message
{
[Message(1)]
internal class CMsgLogin: IEventAsync
{
public Task RunAsync(Env env)
{
throw new NotImplementedException();
}
}
}
......@@ -63,7 +63,7 @@ namespace Model
env[EnvKey.OwnerId] = this.OwnerId;
env[EnvKey.BuffId] = this.Id;
this.TimerId = World.Instance.GetComponent<TimerComponent>()
.Add(this.Expiration, CallbackType.BuffTimeoutCallback, env);
.Add(this.Expiration, ActionType.BuffTimeoutAction, env);
}
}
......@@ -101,7 +101,7 @@ namespace Model
env[EnvKey.OwnerId] = this.OwnerId;
env[EnvKey.BuffId] = this.Id;
this.TimerId = World.Instance.GetComponent<TimerComponent>()
.Add(this.Expiration, CallbackType.BuffTimeoutCallback, env);
.Add(this.Expiration, ActionType.BuffTimeoutAction, env);
}
}
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Common.Base;
using Common.Event;
namespace Model
{
public class ActorComponent : Component<Unit>
{
private readonly Queue<Env> msgEnvQueue = new Queue<Env>();
public Action msgAction = () => {};
public async void Run()
{
while (true)
{
Env env = await this.Get();
var message = env.Get<byte[]>(EnvKey.Message);
int opcode = BitConverter.ToUInt16(message, 0);
await World.Instance.GetComponent<EventComponent<MessageAttribute>>().Run(opcode, env);
}
}
public void Add(Env msgEnv)
{
this.msgEnvQueue.Enqueue(msgEnv);
msgAction();
}
private Task<Env> Get()
{
var tcs = new TaskCompletionSource<Env>();
if (this.msgEnvQueue.Count > 0)
{
Env env = this.msgEnvQueue.Dequeue();
tcs.SetResult(env);
}
else
{
msgAction = () =>
{
msgAction = () => { };
Env msg = this.msgEnvQueue.Dequeue();
tcs.SetResult(msg);
};
}
return tcs.Task;
}
}
}
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;
using Common.Base;
using Common.Event;
......@@ -9,11 +10,14 @@ namespace Model
public class EventComponent<AttributeType>: Component<World>, IAssemblyLoader
where AttributeType : AEventAttribute
{
private Dictionary<int, List<IEvent>> events;
private Dictionary<int, List<IEventSync>> eventSyncs;
private Dictionary<int, List<IEventAsync>> eventAsyncs;
public void Load(Assembly assembly)
{
this.events = new Dictionary<int, List<IEvent>>();
this.eventSyncs = new Dictionary<int, List<IEventSync>>();
this.eventAsyncs = new Dictionary<int, List<IEventAsync>>();
Type[] types = assembly.GetTypes();
foreach (Type t in types)
......@@ -24,35 +28,63 @@ namespace Model
continue;
}
object obj = Activator.CreateInstance(t);
IEvent iEvent = obj as IEvent;
if (iEvent == null)
IEventSync iEventSync = obj as IEventSync;
if (iEventSync != null)
{
throw new Exception(string.Format("event not inherit IEvent interface: {0}",
obj.GetType().FullName));
}
AEventAttribute iEventAttribute = (AEventAttribute)attrs[0];
AEventAttribute iEventAttribute = (AEventAttribute) attrs[0];
if (!this.eventSyncs.ContainsKey(iEventAttribute.Type))
{
this.eventSyncs.Add(iEventAttribute.Type, new List<IEventSync>());
}
this.eventSyncs[iEventAttribute.Type].Add(iEventSync);
continue;
}
if (!this.events.ContainsKey(iEventAttribute.Type))
IEventAsync iEventAsync = obj as IEventAsync;
// ReSharper disable once InvertIf
if (iEventAsync != null)
{
this.events.Add(iEventAttribute.Type, new List<IEvent>());
AEventAttribute iEventAttribute = (AEventAttribute)attrs[0];
if (!this.eventAsyncs.ContainsKey(iEventAttribute.Type))
{
this.eventAsyncs.Add(iEventAttribute.Type, new List<IEventAsync>());
}
this.eventAsyncs[iEventAttribute.Type].Add(iEventAsync);
continue;
}
this.events[iEventAttribute.Type].Add(iEvent);
throw new Exception(
string.Format("event not inherit IEventSync or IEventAsync interface: {0}",
obj.GetType().FullName));
}
}
public void Run(int type, Env env)
public async Task Run(int type, Env env)
{
List<IEvent> iEventDict = null;
if (!this.events.TryGetValue(type, out iEventDict))
List<IEventSync> iEventSyncs = null;
if (this.eventSyncs.TryGetValue(type, out iEventSyncs))
{
return;
foreach (IEventSync iEventSync in iEventSyncs)
{
iEventSync.Run(env);
}
}
foreach (var iEvent in iEventDict)
List<IEventAsync> iEventAsyncs = null;
// ReSharper disable once InvertIf
if (this.eventAsyncs.TryGetValue(type, out iEventAsyncs))
{
iEvent.Run(env);
foreach (IEventAsync iEventAsync in iEventAsyncs)
{
await iEventAsync.RunAsync(env);
}
}
throw new Exception(
string.Format("no event handler, AttributeType: {0} type: {1}",
typeof(AttributeType).Name, type));
}
}
}
\ No newline at end of file
......@@ -16,10 +16,10 @@ namespace Model
switch (protocol)
{
case NetworkProtocol.TCP:
this.service = new TService("127.0.0.1", 8888);
this.service = new TService(host, port);
break;
case NetworkProtocol.UDP:
this.service = new UService("127.0.0.1", 8888);
this.service = new UService(host, port);
break;
default:
throw new ArgumentOutOfRangeException("protocol");
......@@ -37,7 +37,7 @@ namespace Model
{
while (true)
{
IChannel channel = await this.service.GetChannel();
AChannel channel = await this.service.GetChannel();
ProcessChannel(channel);
}
}
......@@ -46,15 +46,16 @@ namespace Model
/// 接收分发封包
/// </summary>
/// <param name="channel"></param>
private static async void ProcessChannel(IChannel channel)
private static async void ProcessChannel(AChannel channel)
{
while (true)
{
byte[] message = await channel.RecvAsync();
Env env = new Env();
env[EnvKey.Channel] = channel;
env[EnvKey.Message] = message;
int opcode = BitConverter.ToUInt16(message, 0);
World.Instance.GetComponent<EventComponent<MessageAttribute>>().Run(opcode, env);
await World.Instance.GetComponent<EventComponent<ActionAttribute>>()
.Run(ActionType.MessageAction, env);
}
}
}
......
......@@ -71,7 +71,7 @@ namespace Model
continue;
}
this.Remove(id);
World.Instance.GetComponent<EventComponent<CallbackAttribute>>()
World.Instance.GetComponent<EventComponent<ActionAttribute>>()
.Run(timer.CallbackId, timer.Env);
}
}
......
......@@ -7,5 +7,6 @@
public const string Buff = "Buff";
public const string BuffId = "BuffId";
public const string Message = "Message";
public const string Channel = "Channel";
}
}
\ No newline at end of file
......@@ -9,9 +9,9 @@ namespace Model
}
}
public class CallbackAttribute: AEventAttribute
public class ActionAttribute: AEventAttribute
{
public CallbackAttribute(int type): base(type)
public ActionAttribute(int type): base(type)
{
}
}
......
......@@ -8,8 +8,9 @@
public const int AfterRemoveBuff = 3;
}
public static class CallbackType
public static class ActionType
{
public const int BuffTimeoutCallback = 0;
public const int BuffTimeoutAction = 0;
public const int MessageAction = 1;
}
}
\ No newline at end of file
......@@ -45,6 +45,7 @@
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="Component\ActorComponent.cs" />
<Compile Include="Component\BehaviorTreeComponent.cs" />
<Compile Include="BehaviorTree\BehaviorTree.cs" />
<Compile Include="BehaviorTree\BehaviorTreeFactory.cs" />
......
......@@ -23,7 +23,7 @@ namespace MongoDBTest
// 加载配置
world.AddComponent<ConfigComponent>();
world.AddComponent<EventComponent<CallbackAttribute>>();
world.AddComponent<EventComponent<ActionAttribute>>();
world.AddComponent<EventComponent<EventAttribute>>();
world.AddComponent<TimerComponent>();
world.AddComponent<UnitComponent>();
......
......@@ -66,7 +66,8 @@
<Compile Include="Config\ICategory.cs" />
<Compile Include="Event\Env.cs" />
<Compile Include="Event\AEventAttribute.cs" />
<Compile Include="Event\IEvent.cs" />
<Compile Include="Event\IEventAsync.cs" />
<Compile Include="Event\IEventSync.cs" />
<Compile Include="Base\Object.cs" />
<Compile Include="Config\ConfigAttribute.cs" />
<Compile Include="Config\ACategory.cs" />
......
using System.Threading.Tasks;
namespace Common.Event
{
public interface IEventAsync
{
Task RunAsync(Env env);
}
}
\ No newline at end of file
namespace Common.Event
{
public interface IEvent
public interface IEventSync
{
void Run(Env env);
}
......
using System;
using System.Threading.Tasks;
using Common.Base;
namespace Network
{
......@@ -12,20 +13,22 @@ namespace Network
NoAllocate = 1 << 2
}
public interface IChannel: IDisposable
public abstract class AChannel: Entity<AChannel>, IDisposable
{
/// <summary>
/// 发送消息
/// </summary>
void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable);
public abstract void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable);
/// <summary>
/// 接收消息
/// </summary>
Task<byte[]> RecvAsync();
public abstract Task<byte[]> RecvAsync();
Task<bool> DisconnnectAsync();
public abstract Task<bool> DisconnnectAsync();
string RemoteAddress { get; }
public abstract string RemoteAddress { get; }
public abstract void Dispose();
}
}
\ No newline at end of file
......@@ -17,11 +17,11 @@ namespace Network
/// <param name="action"></param>
void Add(Action action);
Task<IChannel> GetChannel(string host, int port);
Task<AChannel> GetChannel(string host, int port);
Task<IChannel> GetChannel();
Task<AChannel> GetChannel();
void Remove(IChannel channel);
void Remove(AChannel channel);
void RunOnce(int timeout);
......
......@@ -41,7 +41,13 @@
<ItemGroup>
<Compile Include="IService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="IChannel.cs" />
<Compile Include="AChannel.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Common\Common.csproj">
<Project>{19f8f043-1f99-4550-99df-dea5c7d77e55}</Project>
<Name>Common</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
......
......@@ -7,7 +7,7 @@ using Network;
namespace TNet
{
internal class TChannel: IChannel
internal class TChannel: AChannel
{
private const int SendInterval = 50;
......@@ -55,13 +55,13 @@ namespace TNet
this.Dispose(false);
}
public void Dispose()
public override void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
public override void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
{
byte[] size = BitConverter.GetBytes(buffer.Length);
this.sendBuffer.SendTo(size);
......@@ -80,7 +80,7 @@ namespace TNet
}
}
public Task<byte[]> RecvAsync()
public override Task<byte[]> RecvAsync()
{
var tcs = new TaskCompletionSource<byte[]>();
......@@ -95,12 +95,12 @@ namespace TNet
return tcs.Task;
}
public async Task<bool> DisconnnectAsync()
public override async Task<bool> DisconnnectAsync()
{
return await this.socket.DisconnectAsync();
}
public string RemoteAddress
public override string RemoteAddress
{
get
{
......
......@@ -65,7 +65,7 @@ namespace TNet
this.poller.Add(action);
}
private async Task<IChannel> ConnectAsync(string host, int port)
private async Task<AChannel> ConnectAsync(string host, int port)
{
TSocket newSocket = new TSocket(this.poller);
await newSocket.ConnectAsync(host, port);
......@@ -74,7 +74,7 @@ namespace TNet
return channel;
}
public async Task<IChannel> GetChannel()
public async Task<AChannel> GetChannel()
{
if (this.acceptor == null)
{
......@@ -87,7 +87,7 @@ namespace TNet
return channel;
}
public void Remove(IChannel channel)
public void Remove(AChannel channel)
{
TChannel tChannel = channel as TChannel;
if (tChannel == null)
......@@ -98,7 +98,7 @@ namespace TNet
this.timerManager.Remove(tChannel.SendTimer);
}
public async Task<IChannel> GetChannel(string host, int port)
public async Task<AChannel> GetChannel(string host, int port)
{
TChannel channel = null;
if (this.channels.TryGetValue(host + ":" + port, out channel))
......
......@@ -15,7 +15,7 @@ namespace TNetTest
private async void ClientEvent(IService service, string hostName, ushort port)
{
IChannel channel = await service.GetChannel(hostName, port);
AChannel channel = await service.GetChannel(hostName, port);
channel.SendAsync("0123456789".ToByteArray());
byte[] bytes = await channel.RecvAsync();
......@@ -26,7 +26,7 @@ namespace TNetTest
private async void ServerEvent(IService service)
{
IChannel channel = await service.GetChannel();
AChannel channel = await service.GetChannel();
byte[] bytes = await channel.RecvAsync();
CollectionAssert.AreEqual("0123456789".ToByteArray(), bytes);
Array.Reverse(bytes);
......
......@@ -4,7 +4,7 @@ using Network;
namespace UNet
{
internal class UChannel: IChannel
internal class UChannel: AChannel
{
private readonly UService service;
private USocket socket;
......@@ -39,23 +39,23 @@ namespace UNet
this.Dispose(false);
}
public void Dispose()
public override void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
public override void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
{
this.socket.SendAsync(buffer, channelID, flags);
}
public async Task<byte[]> RecvAsync()
public override async Task<byte[]> RecvAsync()
{
return await this.socket.RecvAsync();
}
public string RemoteAddress
public override string RemoteAddress
{
get
{
......@@ -63,7 +63,7 @@ namespace UNet
}
}
public async Task<bool> DisconnnectAsync()
public override async Task<bool> DisconnnectAsync()
{
return await this.socket.DisconnectAsync();
}
......
......@@ -59,7 +59,7 @@ namespace UNet
this.poller.Add(action);
}
private async Task<IChannel> ConnectAsync(string host, int port)
private async Task<AChannel> ConnectAsync(string host, int port)
{
USocket newSocket = await this.poller.ConnectAsync(host, (ushort) port);
UChannel channel = new UChannel(newSocket, this);
......@@ -67,7 +67,7 @@ namespace UNet
return channel;
}
public async Task<IChannel> GetChannel(string host, int port)
public async Task<AChannel> GetChannel(string host, int port)
{
UChannel channel = null;
if (this.channels.TryGetValue(host + ":" + port, out channel))
......@@ -77,7 +77,7 @@ namespace UNet
return await this.ConnectAsync(host, port);
}
public async Task<IChannel> GetChannel()
public async Task<AChannel> GetChannel()
{
USocket socket = await this.poller.AcceptAsync();
UChannel channel = new UChannel(socket, this);
......@@ -85,7 +85,7 @@ namespace UNet
return channel;
}
public void Remove(IChannel channel)
public void Remove(AChannel channel)
{
UChannel tChannel = channel as UChannel;
if (tChannel == null)
......
......@@ -15,7 +15,7 @@ namespace UNetTest
private async void ClientEvent(IService service, string hostName, ushort port)
{
IChannel channel = await service.GetChannel(hostName, port);
AChannel channel = await service.GetChannel(hostName, port);
channel.SendAsync("0123456789".ToByteArray());
byte[] bytes = await channel.RecvAsync();
......@@ -26,7 +26,7 @@ namespace UNetTest
private async void ServerEvent(IService service)
{
IChannel channel = await service.GetChannel();
AChannel channel = await service.GetChannel();
byte[] bytes = await channel.RecvAsync();
CollectionAssert.AreEqual("0123456789".ToByteArray(), bytes);
Array.Reverse(bytes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册