提交 1a59719b 编写于 作者: T tanghai

优化网络反序列化,每个消息不再需要new一个MemoryStream,大大减少GC

上级 5c942aa6
......@@ -11,7 +11,7 @@ namespace ETHotfix
try
{
Type messageType = Game.Scene.GetComponent<OpcodeTypeComponent>().GetType(packet.Opcode);
message = (IMessage)session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, packet.Offset, packet.Length);
message = (IMessage)session.Network.MessagePacker.DeserializeFrom(messageType, packet.Stream);
}
catch (Exception e)
{
......
......@@ -8,7 +8,7 @@ namespace ETHotfix
{
public override void Awake(NetOuterComponent self)
{
self.Awake(NetworkProtocol.TCP);
self.Awake(NetworkProtocol.KCP);
self.MessagePacker = new ProtobufPacker();
self.MessageDispatcher = new OuterMessageDispatcher();
}
......@@ -19,7 +19,7 @@ namespace ETHotfix
{
public override void Awake(NetOuterComponent self, IPEndPoint ipEndPoint)
{
self.Awake(NetworkProtocol.TCP, ipEndPoint);
self.Awake(NetworkProtocol.KCP, ipEndPoint);
self.MessagePacker = new ProtobufPacker();
self.MessageDispatcher = new OuterMessageDispatcher();
}
......
......@@ -11,7 +11,7 @@ namespace ETHotfix
try
{
Type messageType = session.Network.Entity.GetComponent<OpcodeTypeComponent>().GetType(packet.Opcode);
message = session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, packet.Offset, packet.Length);
message = session.Network.MessagePacker.DeserializeFrom(messageType, packet.Stream);
}
catch (Exception e)
......
......@@ -65,6 +65,11 @@ namespace ETModel
return BsonSerializer.Deserialize(memoryStream, type);
}
}
public static object FromStream(Type type, Stream stream)
{
return BsonSerializer.Deserialize(stream, type);
}
public static T FromBson<T>(byte[] bytes)
{
......
using System;
using System.IO;
namespace ETModel
{
......@@ -19,6 +20,11 @@ namespace ETModel
return MongoHelper.FromBson(type, bytes);
}
public object DeserializeFrom(Type type, Stream stream)
{
return MongoHelper.FromStream(type, stream);
}
public object DeserializeFrom(Type type, byte[] bytes, int index, int count)
{
return MongoHelper.FromBson(type, bytes, index, count);
......
......@@ -82,5 +82,17 @@ namespace ETModel
iSupportInitialize.EndInit();
return t;
}
public static object FromStream(Type type, Stream stream)
{
object t = Serializer.NonGeneric.Deserialize(type, stream);
ISupportInitialize iSupportInitialize = t as ISupportInitialize;
if (iSupportInitialize == null)
{
return t;
}
iSupportInitialize.EndInit();
return t;
}
}
}
\ No newline at end of file
......@@ -17,7 +17,7 @@ namespace ETModel
OpcodeTypeComponent opcodeTypeComponent = session.Network.Entity.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(packet.Opcode);
message = session.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, packet.Offset, packet.Length);
message = session.Network.MessagePacker.DeserializeFrom(responseType, packet.Stream);
}
catch (Exception e)
{
......
......@@ -25,6 +25,8 @@ namespace ETModel
public const int ERR_SocketDisconnected = 202002;
public const int ERR_ReloadFail = 202003;
public const int ERR_ActorLocationNotFound = 202004;
public const int ERR_KcpConnectFail = 202005;
public const int ERR_KcpTimeout = 202006;
public static bool IsRpcNeedThrowException(int error)
{
......
using System;
using System.IO;
namespace ETModel
{
......@@ -8,6 +9,7 @@ namespace ETModel
string SerializeToText(object obj);
object DeserializeFrom(Type type, byte[] bytes);
object DeserializeFrom(Type type, Stream stream);
object DeserializeFrom(Type type, byte[] bytes, int index, int count);
T DeserializeFrom<T>(byte[] bytes);
T DeserializeFrom<T>(byte[] bytes, int index, int count);
......
......@@ -22,7 +22,7 @@
{
public void Awake()
{
this.Awake(NetworkProtocol.TCP);
this.Awake(NetworkProtocol.KCP);
this.MessagePacker = new ProtobufPacker();
this.MessageDispatcher = new ClientDispatcher();
}
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
......@@ -33,11 +34,11 @@ namespace ETModel
private uint lastRecvTime;
private readonly byte[] cacheBytes = new byte[ushort.MaxValue];
public uint Conn;
public uint RemoteConn;
public Packet packet = new Packet(ushort.MaxValue);
// accept
public KChannel(uint conn, uint remoteConn, Socket socket, IPEndPoint remoteEndPoint, KService kService) : base(kService, ChannelType.Accept)
......@@ -50,6 +51,8 @@ namespace ETModel
kcp = new KCP(this.RemoteConn, this);
kcp.SetOutput(this.Output);
kcp.NoDelay(1, 10, 2, 1); //fast
kcp.SetMTU(470);
kcp.WndSize(256, 256);
this.isConnected = true;
this.lastRecvTime = kService.TimeNow;
......@@ -100,7 +103,8 @@ namespace ETModel
this.kcp = new KCP(responseConn, this);
kcp.SetOutput(this.Output);
kcp.NoDelay(1, 10, 2, 1); //fast
kcp.SetMTU(470);
kcp.WndSize(256, 256);
this.lastRecvTime = this.GetService().TimeNow;
HandleSend();
......@@ -108,10 +112,10 @@ namespace ETModel
public void HandleAccept(uint requestConn)
{
cacheBytes.WriteTo(0, KcpProtocalType.ACK);
cacheBytes.WriteTo(4, requestConn);
cacheBytes.WriteTo(8, this.Conn);
this.socket.SendTo(cacheBytes, 0, 12, SocketFlags.None, remoteEndPoint);
packet.Bytes.WriteTo(0, KcpProtocalType.ACK);
packet.Bytes.WriteTo(4, requestConn);
packet.Bytes.WriteTo(8, this.Conn);
this.socket.SendTo(packet.Bytes, 0, 12, SocketFlags.None, remoteEndPoint);
}
/// <summary>
......@@ -119,9 +123,9 @@ namespace ETModel
/// </summary>
private void Connect(uint timeNow)
{
cacheBytes.WriteTo(0, KcpProtocalType.SYN);
cacheBytes.WriteTo(4, this.Conn);
this.socket.SendTo(cacheBytes, 0, 8, SocketFlags.None, remoteEndPoint);
packet.Bytes.WriteTo(0, KcpProtocalType.SYN);
packet.Bytes.WriteTo(4, this.Conn);
this.socket.SendTo(packet.Bytes, 0, 8, SocketFlags.None, remoteEndPoint);
// 200毫秒后再次update发送connect请求
this.GetService().AddToNextTimeUpdate(timeNow + 200, this.Id);
......@@ -129,11 +133,11 @@ namespace ETModel
private void DisConnect()
{
cacheBytes.WriteTo(0, KcpProtocalType.FIN);
cacheBytes.WriteTo(4, this.Conn);
cacheBytes.WriteTo(8, this.RemoteConn);
packet.Bytes.WriteTo(0, KcpProtocalType.FIN);
packet.Bytes.WriteTo(4, this.Conn);
packet.Bytes.WriteTo(8, this.RemoteConn);
//Log.Debug($"client disconnect: {this.Conn}");
this.socket.SendTo(cacheBytes, 0, 12, SocketFlags.None, remoteEndPoint);
this.socket.SendTo(packet.Bytes, 0, 12, SocketFlags.None, remoteEndPoint);
}
public void Update(uint timeNow)
......@@ -144,7 +148,7 @@ namespace ETModel
// 5秒连接不上,报错
if (timeNow - this.lastRecvTime > 5 * 1000)
{
this.OnError((int)SocketError.ConnectionRefused);
this.OnError(ErrorCode.ERR_KcpConnectFail);
return;
}
Connect(timeNow);
......@@ -152,9 +156,9 @@ namespace ETModel
}
// 超时断开连接
if (timeNow - this.lastRecvTime > 20 * 1000)
if (timeNow - this.lastRecvTime > 40 * 1000)
{
this.OnError((int)SocketError.Disconnecting);
this.OnError(ErrorCode.ERR_KcpTimeout);
return;
}
this.kcp.Update(timeNow);
......@@ -189,7 +193,8 @@ namespace ETModel
this.OnError((int)SocketError.NetworkReset);
return;
}
int count = this.kcp.Recv(this.cacheBytes, 0, this.cacheBytes.Length);
this.packet.Stream.SetLength(ushort.MaxValue);
int count = this.kcp.Recv(this.packet.Bytes, 0, ushort.MaxValue);
if (count <= 0)
{
return;
......@@ -197,17 +202,14 @@ namespace ETModel
lastRecvTime = timeNow;
this.packet.Flag = this.cacheBytes[0];
this.packet.Opcode = BitConverter.ToUInt16(this.cacheBytes, 1);
this.packet.Bytes = this.cacheBytes;
this.packet.Offset = Packet.Index;
this.packet.Length = (ushort) (count - Packet.Index);
this.packet.Flag = this.packet.Bytes[0];
this.packet.Opcode = BitConverter.ToUInt16(this.packet.Bytes, 1);
this.packet.Stream.SetLength(count);
this.packet.Stream.Seek(Packet.Index, SeekOrigin.Begin);
this.OnRead(packet);
}
}
public Packet packet = new Packet();
public void Output(byte[] bytes, int count, object user)
{
this.socket.SendTo(bytes, 0, count, SocketFlags.None, this.remoteEndPoint);
......@@ -240,7 +242,7 @@ namespace ETModel
byte[] bytes;
if (this.isConnected)
{
bytes = this.cacheBytes;
bytes = this.packet.Bytes;
}
else
{
......
using System;
using System.IO;
namespace ETModel
{
......@@ -15,30 +16,30 @@ namespace ETModel
public const int FlagIndex = 0;
public const int OpcodeIndex = 1;
public const int Index = 3;
/// <summary>
/// 只读,不允许修改
/// </summary>
public byte[] Bytes { get; set; }
public ushort Offset { get; set; }
public ushort Length { get; set; }
public byte Flag { get; set; }
public ushort Opcode { get; set; }
public Packet()
public byte[] Bytes
{
get
{
return this.Stream.GetBuffer();
}
}
public byte Flag { get; set; }
public ushort Opcode { get; set; }
public MemoryStream Stream { get; }
public Packet(int length)
{
this.Length = 0;
this.Bytes = new byte[length];
this.Stream = new MemoryStream(length);
}
public Packet(byte[] bytes)
{
this.Bytes = bytes;
this.Length = (ushort)bytes.Length;
this.Stream = new MemoryStream(bytes);
}
}
......@@ -95,9 +96,10 @@ namespace ETModel
this.packet.Flag = this.cache[0];
this.buffer.Read(this.cache, 0, 2);
this.packet.Opcode = BitConverter.ToUInt16(this.cache, 0);
this.buffer.Read(this.packet.Bytes, 0, this.packetSize - Packet.Index);
this.packet.Length = (ushort) (this.packetSize - Packet.Index);
this.packet.Offset = 0;
this.packet.Stream.Seek(0, SeekOrigin.Begin);
this.packet.Stream.SetLength(this.packetSize - Packet.Index);
this.buffer.Read(this.packet.Stream.GetBuffer(), 0, this.packetSize - Packet.Index);
this.isOK = true;
this.state = ParserState.PacketSize;
......
using System;
using System.IO;
namespace ETModel
{
......@@ -19,6 +20,11 @@ namespace ETModel
return ProtobufHelper.FromBytes(type, bytes);
}
public object DeserializeFrom(Type type, Stream stream)
{
return ProtobufHelper.FromStream(type, stream);
}
public object DeserializeFrom(Type type, byte[] bytes, int index, int count)
{
return ProtobufHelper.FromBytes(type, bytes, index, count);
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
......@@ -64,7 +65,12 @@ namespace ETModel
{
action.Invoke(new ResponseMessage { Error = this.Error });
}
if (this.Error != 0)
{
Log.Error($"session dispose: {this.Id} {this.Error}");
}
this.Error = 0;
this.channel.Dispose();
this.Network.Remove(id);
......@@ -124,7 +130,7 @@ namespace ETModel
{
OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(opcode);
message = this.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, packet.Offset, packet.Length);
message = this.Network.MessagePacker.DeserializeFrom(responseType, packet.Stream);
//Log.Debug($"recv: {JsonHelper.ToJson(message)}");
}
catch (Exception e)
......@@ -249,12 +255,11 @@ namespace ETModel
Packet packet = ((TChannel)this.channel).parser.packet;
Array.Copy(bytes, 0, packet.Bytes, 0, bytes.Length);
packet.Offset = 0;
packet.Length = (ushort)bytes.Length;
packet.Flag = flag;
packet.Opcode = opcode;
packet.Stream.Seek(0, SeekOrigin.Begin);
packet.Stream.SetLength(bytes.Length);
Array.Copy(bytes, 0, packet.Bytes, 0, bytes.Length);
session.Run(packet);
return;
}
......
......@@ -78,7 +78,6 @@ namespace ILRuntime.Runtime.Generated
ETModel_SessionCallbackComponent_Binding.Register(app);
System_Collections_Generic_Dictionary_2_Int32_Action_1_ILTypeInstance_Binding.Register(app);
ETModel_Component_Binding.Register(app);
ETModel_Packet_Binding.Register(app);
ETModel_ProtobufHelper_Binding.Register(app);
ETModel_MessageInfo_Binding.Register(app);
System_Threading_Tasks_TaskCompletionSource_1_ILTypeInstance_Binding.Register(app);
......
......@@ -15,116 +15,6 @@ namespace ILRuntime.Runtime.Generated
{
unsafe class ETModel_Packet_Binding
{
public static void Register(ILRuntime.Runtime.Enviorment.AppDomain app)
{
BindingFlags flag = BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static | BindingFlags.DeclaredOnly;
MethodBase method;
Type[] args;
Type type = typeof(ETModel.Packet);
args = new Type[]{};
method = type.GetMethod("get_Opcode", flag, null, args, null);
app.RegisterCLRMethodRedirection(method, get_Opcode_0);
args = new Type[]{};
method = type.GetMethod("get_Flag", flag, null, args, null);
app.RegisterCLRMethodRedirection(method, get_Flag_1);
args = new Type[]{};
method = type.GetMethod("get_Bytes", flag, null, args, null);
app.RegisterCLRMethodRedirection(method, get_Bytes_2);
args = new Type[]{};
method = type.GetMethod("get_Offset", flag, null, args, null);
app.RegisterCLRMethodRedirection(method, get_Offset_3);
args = new Type[]{};
method = type.GetMethod("get_Length", flag, null, args, null);
app.RegisterCLRMethodRedirection(method, get_Length_4);
}
static StackObject* get_Opcode_0(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
{
ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
StackObject* ptr_of_this_method;
StackObject* __ret = ILIntepreter.Minus(__esp, 1);
ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
ETModel.Packet instance_of_this_method = (ETModel.Packet)typeof(ETModel.Packet).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
__intp.Free(ptr_of_this_method);
var result_of_this_method = instance_of_this_method.Opcode;
__ret->ObjectType = ObjectTypes.Integer;
__ret->Value = result_of_this_method;
return __ret + 1;
}
static StackObject* get_Flag_1(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
{
ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
StackObject* ptr_of_this_method;
StackObject* __ret = ILIntepreter.Minus(__esp, 1);
ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
ETModel.Packet instance_of_this_method = (ETModel.Packet)typeof(ETModel.Packet).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
__intp.Free(ptr_of_this_method);
var result_of_this_method = instance_of_this_method.Flag;
__ret->ObjectType = ObjectTypes.Integer;
__ret->Value = result_of_this_method;
return __ret + 1;
}
static StackObject* get_Bytes_2(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
{
ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
StackObject* ptr_of_this_method;
StackObject* __ret = ILIntepreter.Minus(__esp, 1);
ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
ETModel.Packet instance_of_this_method = (ETModel.Packet)typeof(ETModel.Packet).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
__intp.Free(ptr_of_this_method);
var result_of_this_method = instance_of_this_method.Bytes;
return ILIntepreter.PushObject(__ret, __mStack, result_of_this_method);
}
static StackObject* get_Offset_3(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
{
ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
StackObject* ptr_of_this_method;
StackObject* __ret = ILIntepreter.Minus(__esp, 1);
ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
ETModel.Packet instance_of_this_method = (ETModel.Packet)typeof(ETModel.Packet).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
__intp.Free(ptr_of_this_method);
var result_of_this_method = instance_of_this_method.Offset;
__ret->ObjectType = ObjectTypes.Integer;
__ret->Value = result_of_this_method;
return __ret + 1;
}
static StackObject* get_Length_4(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
{
ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
StackObject* ptr_of_this_method;
StackObject* __ret = ILIntepreter.Minus(__esp, 1);
ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
ETModel.Packet instance_of_this_method = (ETModel.Packet)typeof(ETModel.Packet).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
__intp.Free(ptr_of_this_method);
var result_of_this_method = instance_of_this_method.Length;
__ret->ObjectType = ObjectTypes.Integer;
__ret->Value = result_of_this_method;
return __ret + 1;
}
}
}
......@@ -55,7 +55,7 @@ namespace ETHotfix
OpcodeTypeComponent opcodeTypeComponent = Game.Scene.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(opcode);
object message = ProtobufHelper.FromBytes(responseType, packet.Bytes, packet.Offset, packet.Length);
object message = this.session.Network.MessagePacker.DeserializeFrom(responseType, packet.Stream);
if ((flag & 0x01) > 0)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册