提交 6ab3eb8f 编写于 作者: T tanghai

简化了actor消息的实现,客户端发送actor消息,gate收到后会包装actorid变成ActorRequest发送到真正的actor上去

上级 b0cbf36a
using System;
using System.Threading.Tasks;
using Model;
namespace Hotfix
{
[ActorMessageHandler(AppType.Map)]
public class ActorRpc_TestRequestHandler : AMActorRpcHandler<Unit, ActorRpc_TestRequest, ActorRpc_TestResponse>
{
protected override async Task<bool> Run(Unit entity, ActorRpc_TestRequest message, Action<ActorRpc_TestResponse> reply)
{
Log.Info(message.request);
reply(new ActorRpc_TestResponse() {response = "response actor rpc"});
return true;
}
}
}
\ No newline at end of file
......@@ -8,7 +8,7 @@ namespace Hotfix
{
protected override async Task<bool> Run(Unit unit, Actor_Test message)
{
Log.Info(message.Info);
Log.Debug(message.Info);
unit.GetComponent<UnitGateComponent>().GetActorProxy().Send(message);
return true;
......
......@@ -36,6 +36,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Component\RealmGateAddressComponentE.cs" />
<Compile Include="Handler\ActorRpc_TestHandler.cs" />
<Compile Include="Handler\Actor_TestHandler.cs" />
<Compile Include="Handler\G2M_CreateUnitHandler.cs" />
<Compile Include="Helper\MessageHelper.cs" />
......
......@@ -7,7 +7,7 @@ namespace Model
public struct ActorMessageInfo
{
public Session Session;
public IActorMessage Message;
public ActorRequest Message;
}
[ObjectEvent]
......@@ -64,14 +64,15 @@ namespace Model
public void Add(ActorMessageInfo info)
{
this.queue.Enqueue(info);
if (this.tcs == null)
{
return;
}
this.tcs?.SetResult(this.queue.Dequeue());
var t = this.tcs;
this.tcs = null;
t.SetResult(this.queue.Dequeue());
}
private Task<ActorMessageInfo> GetAsync()
......
......@@ -71,14 +71,23 @@ namespace Model
return actorHandler;
}
public async Task<bool> Handle(Session session, Entity entity, IActorMessage message)
public async Task<bool> Handle(Session session, Entity entity, ActorRequest message)
{
if (!this.handlers.TryGetValue(message.GetType(), out IMActorHandler handler))
ARequest request = message.AMessage as ARequest;
if (request == null)
{
Log.Error($"ActorRequest.AMessage as ARequest fail: {message.AMessage.GetType().FullName}");
return false;
}
request.RpcId = message.RpcId;
if (!this.handlers.TryGetValue(request.GetType(), out IMActorHandler handler))
{
Log.Error($"not found message handler: {message.GetType().FullName}");
return false;
}
return await handler.Handle(session, entity, message);
return await handler.Handle(session, entity, request);
}
public override void Dispose()
......
......@@ -68,7 +68,7 @@ namespace Model
}
}
public void Handle(Session session, object message)
public void Handle(Session session, AMessage message)
{
if (!this.handlers.TryGetValue(message.GetType(), out List<IMHandler> actions))
{
......
......@@ -18,9 +18,9 @@ namespace Model
public class ActorMessageTask: ActorTask
{
private readonly ActorProxy proxy;
private readonly ARequest message;
private readonly AMessage message;
public ActorMessageTask(ActorProxy proxy, ARequest message)
public ActorMessageTask(ActorProxy proxy, AMessage message)
{
this.proxy = proxy;
this.message = message;
......@@ -28,7 +28,8 @@ namespace Model
public override async Task<AResponse> Run()
{
AResponse response = await this.proxy.RealCall<ActorMessageResponse>(this.message, this.proxy.CancellationTokenSource.Token);
ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
ActorResponse response = await this.proxy.RealCall<ActorResponse>(request, this.proxy.CancellationTokenSource.Token);
return response;
}
......@@ -41,14 +42,14 @@ namespace Model
/// Rpc消息,需要等待返回
/// </summary>
/// <typeparam name="Response"></typeparam>
public class ActorRpcTask<Response> : ActorTask where Response: AActorResponse
public class ActorRpcTask<Response> : ActorTask where Response: AResponse
{
private readonly ActorProxy proxy;
private readonly AActorRequest message;
private readonly ARequest message;
public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
public ActorRpcTask(ActorProxy proxy, AActorRequest message)
public ActorRpcTask(ActorProxy proxy, ARequest message)
{
this.proxy = proxy;
this.message = message;
......@@ -56,7 +57,8 @@ namespace Model
public override async Task<AResponse> Run()
{
Response response = await this.proxy.RealCall<Response>(this.message, this.proxy.CancellationTokenSource.Token);
ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
Response response = await this.proxy.RealCall<Response>(request, this.proxy.CancellationTokenSource.Token);
if (response.Error != ErrorCode.ERR_NotFoundActor)
{
this.Tcs.SetResult(response);
......@@ -221,32 +223,31 @@ namespace Model
++this.WindowSize;
}
this.Remove();
}
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
public void Send(AActorMessage message)
public void Send(AMessage message)
{
message.Id = this.Id;
ActorMessageTask task = new ActorMessageTask(this, message);
this.Add(task);
}
public Task<Response> Call<Response>(AActorRequest request)where Response : AActorResponse
public Task<Response> Call<Response>(ARequest request)where Response : AResponse
{
request.Id = this.Id;
ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
this.Add(task);
return task.Tcs.Task;
}
public async Task<Response> RealCall<Response>(ARequest request, CancellationToken cancellationToken) where Response: AResponse
public async Task<Response> RealCall<Response>(ActorRequest request, CancellationToken cancellationToken) where Response: AResponse
{
try
{
request.Id = this.Id;
Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
Response response = await session.Call<Response>(request, cancellationToken);
return response;
......
......@@ -2,7 +2,10 @@ using System.Collections.Generic; using MongoDB.Bson.Serialization.Attributes;
// 服务器内部消息 Opcode从10000开始
namespace Model{ [Message(Opcode.M2A_Reload)] [BsonIgnoreExtraElements] public class M2A_Reload : ARequest { }
namespace Model{
/// <summary> /// 用来包装actor消息 /// </summary> [Message(Opcode.ActorRequest)] [BsonIgnoreExtraElements] public class ActorRequest : ARequest { public long Id { get; set; } public AMessage AMessage { get; set; } }
/// <summary> /// actor RPC消息响应 /// </summary> [Message(Opcode.ActorResponse)] [BsonIgnoreExtraElements] public class ActorResponse : AResponse { } [Message(Opcode.M2A_Reload)] [BsonIgnoreExtraElements] public class M2A_Reload : ARequest { }
[Message(Opcode.A2M_Reload)] [BsonIgnoreExtraElements] public class A2M_Reload : AResponse { }
......
......@@ -2,6 +2,9 @@
{
public static partial class Opcode
{
public const ushort ActorRequest = 1;
public const ushort ActorResponse = 2;
public const ushort ActorResponseWithM = 3;
public const ushort G2G_LockRequest = 10;
public const ushort G2G_LockResponse = 11;
public const ushort G2G_LockReleaseRequest = 12;
......
......@@ -117,7 +117,7 @@ namespace Model
return;
}
this.network.MessageDispatcher.Dispatch(this, opcode, offset, messageBytes, message);
this.network.MessageDispatcher.Dispatch(this, opcode, offset, messageBytes, (AMessage)message);
}
/// <summary>
......@@ -127,6 +127,7 @@ namespace Model
where Response : AResponse
{
request.RpcId = ++RpcId;
//Log.Debug($"{request.GetType().FullName} {request.RpcId}");
this.SendMessage(request);
var tcs = new TaskCompletionSource<Response>();
......@@ -141,7 +142,7 @@ namespace Model
tcs.SetException(new RpcException(response.Error, response.Message));
return;
}
//Log.Debug($"recv: {response.ToJson()}");
//Log.Debug($"recv: {MongoHelper.ToJson(response)}");
tcs.SetResult(response);
}
catch (Exception e)
......
using MongoDB.Bson.Serialization.Attributes;
namespace Model
namespace Model
{
public interface IActorMessage
{
[BsonIgnoreIfDefault]
long Id { get; set; }
}
public abstract class AActorMessage: ARequest, IActorMessage
{
[BsonIgnoreIfDefault]
public long Id { get; set; }
}
public abstract class ActorMessageResponse : AResponse
public abstract class AActorMessage : AMessage
{
}
public abstract class AActorRequest : ARequest, IActorMessage
public abstract class AActorRequest : ARequest
{
[BsonIgnoreIfDefault]
public long Id { get; set; }
}
/// <summary>
/// 服务端回的RPC消息需要继承这个抽象类
/// </summary>
public abstract class AActorResponse: AResponse
public abstract class AActorResponse : AResponse
{
}
}
\ No newline at end of file
......@@ -3,11 +3,11 @@ using System.Threading.Tasks;
namespace Model
{
public abstract class AMActorHandler<E, Message>: IMActorHandler where E: Entity where Message : AActorMessage
public abstract class AMActorHandler<E, Message>: IMActorHandler where E: Entity where Message : AMessage
{
protected abstract Task<bool> Run(E entity, Message message);
public async Task<bool> Handle(Session session, Entity entity, object msg)
public async Task<bool> Handle(Session session, Entity entity, AMessage msg)
{
Message message = msg as Message;
if (message == null)
......@@ -42,7 +42,7 @@ namespace Model
protected abstract Task<bool> Run(E entity, Request message, Action<Response> reply);
public async Task<bool> Handle(Session session, Entity entity, object message)
public async Task<bool> Handle(Session session, Entity entity, AMessage message)
{
try
{
......
......@@ -6,7 +6,7 @@ namespace Model
{
protected abstract void Run(Session session, Message message);
public void Handle(Session session, object msg)
public void Handle(Session session, AMessage msg)
{
Message message = msg as Message;
if (message == null)
......@@ -34,7 +34,7 @@ namespace Model
protected abstract void Run(Session session, Request message, Action<Response> reply);
public void Handle(Session session, object message)
public void Handle(Session session, AMessage message)
{
try
{
......
......@@ -15,7 +15,7 @@ namespace Model
/// <summary>
/// 服务端回的RPC消息需要继承这个抽象类
/// </summary>
public abstract class AResponse
public abstract class AResponse: AMessage
{
public uint RpcId;
......
......@@ -4,7 +4,7 @@ namespace Model
{
public interface IEntityActorHandler
{
Task<bool> Handle(Session session, Entity entity, IActorMessage message);
Task<bool> Handle(Session session, Entity entity, ActorRequest message);
}
/// <summary>
......@@ -12,17 +12,16 @@ namespace Model
/// </summary>
public class GateSessionEntityActorHandler : IEntityActorHandler
{
public async Task<bool> Handle(Session session, Entity entity, IActorMessage message)
public async Task<bool> Handle(Session session, Entity entity, ActorRequest message)
{
message.Id = 0;
((Session)entity).Send((AMessage)message);
((Session)entity).Send(message.AMessage);
return true;
}
}
public class CommonEntityActorHandler : IEntityActorHandler
{
public async Task<bool> Handle(Session session, Entity entity, IActorMessage message)
public async Task<bool> Handle(Session session, Entity entity, ActorRequest message)
{
return await Game.Scene.GetComponent<ActorMessageDispatherComponent>().Handle(session, entity, message);
}
......
......@@ -5,7 +5,7 @@ namespace Model
{
public interface IMActorHandler
{
Task<bool> Handle(Session session, Entity entity, object message);
Task<bool> Handle(Session session, Entity entity, AMessage message);
Type GetMessageType();
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ namespace Model
{
public interface IMHandler
{
void Handle(Session session, object message);
void Handle(Session session, AMessage message);
Type GetMessageType();
}
}
\ No newline at end of file
......@@ -2,6 +2,6 @@
{
public interface IMessageDispatcher
{
void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, object message);
void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, AMessage message);
}
}
......@@ -4,13 +4,14 @@ namespace Model
{
public class InnerMessageDispatcher: IMessageDispatcher
{
public void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, object message)
public void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, AMessage message)
{
// 收到actor消息分发给actor自己去处理
if (message is IActorMessage actorMessage)
if (message is ActorRequest actorRequest)
{
Entity entity = Game.Scene.GetComponent<ActorManagerComponent>().Get(actorMessage.Id);
entity.GetComponent<ActorComponent>().Add(new ActorMessageInfo() { Session = session, Message = actorMessage });
//Log.Info(MongoHelper.ToJson(actorRequest));
Entity entity = Game.Scene.GetComponent<ActorManagerComponent>().Get(actorRequest.Id);
entity.GetComponent<ActorComponent>().Add(new ActorMessageInfo() { Session = session, Message = actorRequest });
return;
}
......
......@@ -4,31 +4,30 @@ namespace Model
{
public class OuterMessageDispatcher: IMessageDispatcher
{
public async void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, object message)
public async void Dispatch(Session session, ushort opcode, int offset, byte[] messageBytes, AMessage message)
{
// gate session收到actor消息直接转发给actor自己去处理
if (message is AActorMessage aActorMessage)
if (message is AActorMessage)
{
long unitId = session.GetComponent<SessionPlayerComponent>().Player.UnitId;
aActorMessage.Id = unitId;
ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(aActorMessage.Id);
actorProxy.Send(aActorMessage);
ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(unitId);
actorProxy.Send(message);
return;
}
// gate session收到actor rpc消息,先向actor 发送rpc请求,再将请求结果返回客户端
if (message is AActorRequest aActorRequest)
{
ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(aActorRequest.Id);
aActorRequest.Id = session.GetComponent<SessionPlayerComponent>().Player.Id;
long unitId = session.GetComponent<SessionPlayerComponent>().Player.UnitId;
ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(unitId);
uint rpcId = aActorRequest.RpcId;
AActorResponse aActorResponse = await actorProxy.Call<AActorResponse>(aActorRequest);
aActorResponse.RpcId = rpcId;
session.Reply(aActorResponse);
AResponse response = await actorProxy.Call<AResponse>(aActorRequest);
response.RpcId = rpcId;
session.Reply(response);
return;
}
if (message is AMessage)
if (message != null)
{
Game.Scene.GetComponent<MessageDispatherComponent>().Handle(session, message);
return;
......
using MongoDB.Bson.Serialization.Attributes;
namespace Model
namespace Model
{
public interface IActorMessage
{
long Id { get; set; }
}
public abstract class AActorMessage: ARequest, IActorMessage
{
public long Id { get; set; }
}
public abstract class ActorMessageResponse : AResponse
public abstract class AActorMessage : AMessage
{
}
public abstract class AActorRequest : ARequest, IActorMessage
public abstract class AActorRequest : ARequest
{
[BsonIgnoreIfDefault]
public long Id { get; set; }
}
/// <summary>
/// 服务端回的RPC消息需要继承这个抽象类
/// </summary>
public abstract class AActorResponse: AResponse
public abstract class AActorResponse : AResponse
{
}
}
\ No newline at end of file
......@@ -6,7 +6,7 @@ namespace Model
{
protected abstract void Run(Message message);
public void Handle(object msg)
public void Handle(AMessage msg)
{
Message message = msg as Message;
if (message == null)
......
......@@ -11,7 +11,7 @@ namespace Model
// 普通消息或者是Rpc请求消息
if (message is AMessage || message is ARequest)
{
MessageInfo messageInfo = new MessageInfo(opcode, message);
MessageInfo messageInfo = new MessageInfo(opcode, (AMessage)message);
if (opcode < 2000)
{
Game.Scene.GetComponent<CrossComponent>().Run(CrossIdType.MessageDeserializeFinish, messageInfo);
......
......@@ -4,7 +4,7 @@ namespace Model
{
public interface IMHandler
{
void Handle(object message);
void Handle(AMessage message);
Type GetMessageType();
}
}
\ No newline at end of file
......@@ -3,9 +3,9 @@
public class MessageInfo
{
public ushort Opcode { get; }
public object Message { get; set; }
public AMessage Message { get; set; }
public MessageInfo(ushort opcode, object message)
public MessageInfo(ushort opcode, AMessage message)
{
this.Opcode = opcode;
this.Message = message;
......
......@@ -210,6 +210,10 @@ namespace Model
catch (ObjectDisposedException)
{
}
catch (System.IO.IOException)
{
this.OnError(this, SocketError.NetworkReset);
}
catch (Exception e)
{
Log.Error(e.ToString());
......
......@@ -12,5 +12,7 @@
public const ushort C2M_Reload = 1008;
public const ushort Actor_Test = 2001;
public const ushort ActorRpc_TestRequest = 2002;
public const ushort ActorRpc_TestResponse = 2003;
}
}
......@@ -42,10 +42,18 @@ namespace Model
public string Info;
}
[Message(Opcode.ActorRpc_TestRequest)]
public class ActorRpc_TestRequest : AActorRequest
{
public string request;
}
[Message(Opcode.ActorRpc_TestResponse)]
public class ActorRpc_TestResponse : AActorResponse
{
public string response;
}
[Message(Opcode.C2M_Reload)]
public class C2M_Reload: ARequest
{
......
......@@ -32,7 +32,10 @@ namespace Hotfix
Log.Info("登陆gate成功!");
// 发送一个actor消息
gateSession.Send(new Actor_Test() { Info = "message client->gate->map->gate->client" });
//gateSession.Send(new Actor_Test() { Info = "message client->gate->map->gate->client" });
ActorRpc_TestResponse response = await gateSession.Call<ActorRpc_TestResponse>(new ActorRpc_TestRequest() { request = "request actor test rpc" });
Log.Info($"recv response: {JsonHelper.ToJson(response)}");
}
catch (Exception e)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册