提交 8ec13716 编写于 作者: T tanghai

新的网络库, 用socket实现,减少gc

上级 4d7af243
{ "_t" : "StartConfig", "_id" : NumberLong("98547768819754"), "components" : [{ "_t" : "OuterConfig", "Host" : "127.0.0.1", "Port" : 10002, "Host2" : null }, { "_t" : "InnerConfig", "Host" : "127.0.0.1", "Port" : 20000 }, { "_t" : "HttpConfig", "Url" : "", "AppId" : 0, "AppKey" : "", "ManagerSystemUrl" : "" }, { "_t" : "DBConfig", "ConnectionString" : null, "DBName" : null }], "AppId" : 1, "AppType" : "AllServer", "ServerIP" : "*" }
{ "_t" : "StartConfig", "_id" : NumberLong("98547768819754"), "components" : [{ "_t" : "OuterConfig", "Host" : "127.0.0.1", "Port" : 10002, "Host2" : null }, { "_t" : "InnerConfig", "Host" : "127.0.0.1", "Port" : 20000 }, { "_t" : "HttpConfig", "Url" : "", "AppId" : 0, "AppKey" : "", "ManagerSystemUrl" : "" }, { "_t" : "DBConfig", "ConnectionString" : null, "DBName" : null }], "AppId" : 1, "AppType" : "Manager", "ServerIP" : "*" }
{ "_t" : "StartConfig", "_id" : NumberLong("98892711264291"), "components" : [{ "_t" : "ClientConfig", "Host" : "127.0.0.1", "Port" : 10002 }], "AppId" : 2, "AppType" : "Benchmark", "ServerIP" : "*" }
......@@ -65,4 +65,8 @@ message C2R_Ping // IRequest
message R2C_Ping // IResponse
{
}
message G2C_Test // IMessage
{
}
\ No newline at end of file
......@@ -2,7 +2,6 @@
using System.Net;
using System.Threading;
using ETModel;
using MongoDB.Bson.Serialization;
using NLog;
namespace App
......@@ -12,8 +11,7 @@ namespace App
private static void Main(string[] args)
{
// 异步方法全部会回掉到主线程
OneThreadSynchronizationContext contex = new OneThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(contex);
SynchronizationContext.SetSynchronizationContext(OneThreadSynchronizationContext.Instance);
try
{
......@@ -113,7 +111,7 @@ namespace App
try
{
Thread.Sleep(1);
contex.Update();
OneThreadSynchronizationContext.Instance.Update();
Game.EventSystem.Update();
}
catch (Exception e)
......
......@@ -3,13 +3,34 @@ using ETModel;
namespace ETHotfix
{
[MessageHandler(AppType.Realm)]
[MessageHandler(AppType.AllServer)]
public class C2R_PingHandler : AMRpcHandler<C2R_Ping, R2C_Ping>
{
private bool isStart = false;
protected override void Run(Session session, C2R_Ping message, Action<R2C_Ping> reply)
{
R2C_Ping r2CPing = new R2C_Ping();
reply(r2CPing);
if (!this.isStart)
{
isStart = true;
Start(session);
}
}
public async void Start(Session session)
{
TimerComponent timerComponent = Game.Scene.GetComponent<TimerComponent>();
G2C_Test g2CTest = new G2C_Test();
while (true)
{
await timerComponent.WaitAsync(1);
for (int i = 0; i < 20; ++i)
{
session.Send(g2CTest);
}
}
}
}
}
\ No newline at end of file
using ETModel;
namespace ETHotfix
{
[MessageHandler(AppType.Benchmark)]
public class G2C_TestHandler: AMHandler<G2C_Test>
{
public static int count = 0;
protected override void Run(Session session, G2C_Test message)
{
count++;
}
}
}
\ No newline at end of file
......@@ -11,7 +11,7 @@ namespace ETHotfix
try
{
Type messageType = Game.Scene.GetComponent<OpcodeTypeComponent>().GetType(packet.Opcode);
message = (IMessage)session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
message = (IMessage)session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, packet.Offset, packet.Length);
}
catch (Exception e)
{
......
......@@ -11,6 +11,15 @@ namespace ETHotfix
self.Awake();
}
}
[ObjectSystem]
public class NetInnerComponentStartSystem : StartSystem<NetInnerComponent>
{
public override void Start(NetInnerComponent self)
{
self.Start();
}
}
[ObjectSystem]
public class NetInnerComponentAwake1System : AwakeSystem<NetInnerComponent, IPEndPoint>
......
......@@ -20,6 +20,15 @@ namespace ETHotfix
self.Awake(a);
}
}
[ObjectSystem]
public class NetOuterComponentStartSystem : StartSystem<NetOuterComponent>
{
public override void Start(NetOuterComponent self)
{
self.Start();
}
}
[ObjectSystem]
public class NetOuterComponentUpdateSystem : UpdateSystem<NetOuterComponent>
......@@ -34,14 +43,14 @@ namespace ETHotfix
{
public static void Awake(this NetOuterComponent self)
{
self.Awake(NetworkProtocol.TCP);
self.Awake(NetworkProtocol.KCP);
self.MessagePacker = new ProtobufPacker();
self.MessageDispatcher = new OuterMessageDispatcher();
}
public static void Awake(this NetOuterComponent self, IPEndPoint ipEndPoint)
{
self.Awake(NetworkProtocol.TCP, ipEndPoint);
self.Awake(NetworkProtocol.KCP, ipEndPoint);
self.MessagePacker = new ProtobufPacker();
self.MessageDispatcher = new OuterMessageDispatcher();
}
......
......@@ -210,5 +210,10 @@ namespace ETHotfix
public string Message { get; set; }
}
[Message(OuterOpcode.G2C_Test)]
[ProtoContract]
public partial class G2C_Test: IMessage
{
}
}
......@@ -11,7 +11,7 @@ namespace ETHotfix
try
{
Type messageType = session.Network.Entity.GetComponent<OpcodeTypeComponent>().GetType(packet.Opcode);
message = session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
message = session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, packet.Offset, packet.Length);
}
catch (Exception e)
......
......@@ -16,5 +16,6 @@ namespace ETHotfix
public const ushort M2C_Reload = 112;
public const ushort C2R_Ping = 113;
public const ushort R2C_Ping = 114;
public const ushort G2C_Test = 115;
}
}
......@@ -6,8 +6,13 @@ namespace ETModel
{
public static Component CreateWithParent(Type type, Component parent)
{
Component component = (Component)Game.ObjectPool.Fetch(type);
Component component = Game.ObjectPool.Fetch(type);
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -16,6 +21,11 @@ namespace ETModel
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -24,6 +34,11 @@ namespace ETModel
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a);
return component;
}
......@@ -32,6 +47,11 @@ namespace ETModel
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b);
return component;
}
......@@ -40,6 +60,11 @@ namespace ETModel
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b, c);
return component;
}
......@@ -47,6 +72,11 @@ namespace ETModel
public static T Create<T>() where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -54,6 +84,11 @@ namespace ETModel
public static T Create<T, A>(A a) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a);
return component;
}
......@@ -61,6 +96,11 @@ namespace ETModel
public static T Create<T, A, B>(A a, B b) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b);
return component;
}
......@@ -68,6 +108,11 @@ namespace ETModel
public static T Create<T, A, B, C>(A a, B b, C c) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b, c);
return component;
}
......
......@@ -6,9 +6,13 @@ namespace ETModel
{
public class OneThreadSynchronizationContext : SynchronizationContext
{
public static OneThreadSynchronizationContext Instance = new OneThreadSynchronizationContext();
// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
private Action a;
private void Add(Action action)
{
this.queue.Enqueue(action);
......@@ -18,7 +22,6 @@ namespace ETModel
{
while (true)
{
Action a;
if (!this.queue.TryDequeue(out a))
{
return;
......
namespace ETModel
{
[MessageHandler]
public class G2C_TestHandler: AMHandler<G2C_Test>
{
public static int count = 0;
protected override void Run(Session session, G2C_Test message)
{
count++;
}
}
}
\ No newline at end of file
fileFormatVersion: 2
guid: 60b2c98954815df43a4fa750d6dd0b67
timeCreated: 1528279930
licenseType: Pro
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
......@@ -6,8 +6,6 @@ namespace ETModel
{
public class Init : MonoBehaviour
{
private readonly OneThreadSynchronizationContext contex = new OneThreadSynchronizationContext();
private async void Start()
{
try
......@@ -17,7 +15,7 @@ namespace ETModel
Log.Warning($"请使用Unity2017.1.3p2, 下载地址:\n https://beta.unity3d.com/download/744dab055778/UnityDownloadAssistant-2017.1.3p2.exe?_ga=2.42497696.443074145.1521714954-1119432033.1499739574");
}
SynchronizationContext.SetSynchronizationContext(this.contex);
SynchronizationContext.SetSynchronizationContext(OneThreadSynchronizationContext.Instance);
DontDestroyOnLoad(gameObject);
Game.EventSystem.Add(DLLType.Model, typeof(Init).Assembly);
......@@ -55,7 +53,7 @@ namespace ETModel
private void Update()
{
this.contex.Update();
OneThreadSynchronizationContext.Instance.Update();
Game.Hotfix.Update?.Invoke();
Game.EventSystem.Update();
}
......
......@@ -17,7 +17,7 @@ namespace ETModel
OpcodeTypeComponent opcodeTypeComponent = session.Network.Entity.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(packet.Opcode);
message = session.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
message = session.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, packet.Offset, packet.Length);
}
catch (Exception e)
{
......
......@@ -13,11 +13,13 @@ namespace ETModel
public const int ERR_NotFoundUnit = 104;
public const int ERR_ConnectGateKeyError = 105;
public const int ERR_Exception = 1000;
public const int ERR_RpcFail = 2001;
public const int ERR_SocketDisconnected = 2002;
public const int ERR_ReloadFail = 2003;
public const int ERR_ActorLocationNotFound = 2004;
public const int ERR_Exception = 100000;
public const int ERR_SessionDispose = 100001;
}
}
\ No newline at end of file
......@@ -8,6 +8,15 @@
self.Awake();
}
}
[ObjectSystem]
public class NetOuterComponentStartSystem : StartSystem<NetOuterComponent>
{
public override void Start(NetOuterComponent self)
{
self.Start();
}
}
[ObjectSystem]
public class NetOuterComponentUpdateSystem : UpdateSystem<NetOuterComponent>
......@@ -22,7 +31,7 @@
{
public void Awake()
{
this.Awake(NetworkProtocol.TCP);
this.Awake(NetworkProtocol.KCP);
this.MessagePacker = new ProtobufPacker();
this.MessageDispatcher = new ClientDispatcher();
}
......
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace ETModel
{
......@@ -21,6 +19,11 @@ namespace ETModel
Accept,
}
public class UserTokenInfo
{
public long InstanceId;
}
public abstract class AChannel: ComponentWithId
{
public ChannelType ChannelType { get; }
......@@ -56,39 +59,33 @@ namespace ETModel
this.readCallback -= value;
}
}
protected void OnRead(Packet packet)
{
if (this.IsDisposed)
{
return;
}
this.readCallback?.Invoke(packet);
this.readCallback.Invoke(packet);
}
protected void OnError(int error)
protected void OnError(int e)
{
if (this.IsDisposed)
{
return;
}
this.errorCallback?.Invoke(this, error);
this.errorCallback?.Invoke(this, e);
}
protected AChannel(AService service, ChannelType channelType)
{
this.Id = IdGenerater.GenerateId();
this.ChannelType = channelType;
this.service = service;
}
public abstract void Start();
/// <summary>
/// 发送消息
/// </summary>
public abstract void Send(byte[] buffer, int index, int length);
public abstract void Send(List<byte[]> buffers);
public override void Dispose()
{
if (this.IsDisposed)
......
using System;
using System.Net;
using System.Threading.Tasks;
namespace ETModel
{
public enum NetworkProtocol
{
TCP,
KCP,
TCP,
}
public abstract class AService: Component
{
public abstract AChannel GetChannel(long id);
public abstract Task<AChannel> AcceptChannel();
private Action<AChannel> acceptCallback;
public event Action<AChannel> AcceptCallback
{
add
{
this.acceptCallback += value;
}
remove
{
this.acceptCallback -= value;
}
}
protected void OnAccept(AChannel channel)
{
this.acceptCallback.Invoke(channel);
}
public abstract AChannel ConnectChannel(IPEndPoint ipEndPoint);
public abstract void Remove(long channelId);
public abstract void Update();
public abstract void Start();
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace ETModel
{
......@@ -21,16 +20,14 @@ namespace ETModel
}
}
public class KChannel: AChannel
public class KChannel : AChannel
{
private UdpClient socket;
private Socket socket;
private Kcp kcp;
private KCP kcp;
private readonly CircularBuffer recvBuffer = new CircularBuffer();
private readonly Queue<WaitSendBuffer> sendBuffer = new Queue<WaitSendBuffer>();
private readonly PacketParser parser;
private bool isConnected;
private readonly IPEndPoint remoteEndPoint;
......@@ -42,29 +39,30 @@ namespace ETModel
public uint RemoteConn;
public uint lastConnectTime;
// accept
public KChannel(uint conn, uint remoteConn, UdpClient socket, IPEndPoint remoteEndPoint, KService kService): base(kService, ChannelType.Accept)
public KChannel(uint conn, uint remoteConn, Socket socket, IPEndPoint remoteEndPoint, KService kService) : base(kService, ChannelType.Accept)
{
this.Id = conn;
this.Conn = conn;
this.RemoteConn = remoteConn;
this.remoteEndPoint = remoteEndPoint;
this.socket = socket;
this.parser = new PacketParser(this.recvBuffer);
kcp = new Kcp(this.RemoteConn, this.Output);
kcp.SetMtu(512);
kcp = new KCP(this.RemoteConn, this);
kcp.SetOutput(this.Output);
kcp.NoDelay(1, 10, 2, 1); //fast
this.isConnected = true;
this.lastRecvTime = kService.TimeNow;
}
// connect
public KChannel(uint conn, UdpClient socket, IPEndPoint remoteEndPoint, KService kService): base(kService, ChannelType.Connect)
public KChannel(uint conn, Socket socket, IPEndPoint remoteEndPoint, KService kService) : base(kService, ChannelType.Connect)
{
this.Id = conn;
this.Conn = conn;
this.socket = socket;
this.parser = new PacketParser(this.recvBuffer);
this.remoteEndPoint = remoteEndPoint;
this.lastRecvTime = kService.TimeNow;
......@@ -100,10 +98,9 @@ namespace ETModel
return;
}
this.isConnected = true;
this.RemoteConn = responseConn;
this.kcp = new Kcp(responseConn, this.Output);
kcp.SetMtu(512);
this.kcp = new KCP(responseConn, this);
kcp.SetOutput(this.Output);
kcp.NoDelay(1, 10, 2, 1); //fast
HandleSend();
......@@ -114,7 +111,7 @@ namespace ETModel
cacheBytes.WriteTo(0, KcpProtocalType.ACK);
cacheBytes.WriteTo(4, requestConn);
cacheBytes.WriteTo(8, this.Conn);
this.socket.Send(cacheBytes, 12, remoteEndPoint);
this.socket.SendTo(cacheBytes, 0, 12, SocketFlags.None, remoteEndPoint);
}
/// <summary>
......@@ -124,8 +121,7 @@ namespace ETModel
{
cacheBytes.WriteTo(0, KcpProtocalType.SYN);
cacheBytes.WriteTo(4, this.Conn);
//Log.Debug($"client connect: {this.Conn}");
this.socket.Send(cacheBytes, 8, remoteEndPoint);
this.socket.SendTo(cacheBytes, 0, 8, SocketFlags.None, remoteEndPoint);
// 200毫秒后再次update发送connect请求
this.GetService().AddToNextTimeUpdate(timeNow + 200, this.Id);
......@@ -137,7 +133,7 @@ namespace ETModel
cacheBytes.WriteTo(4, this.Conn);
cacheBytes.WriteTo(8, this.RemoteConn);
//Log.Debug($"client disconnect: {this.Conn}");
this.socket.Send(cacheBytes, 12, remoteEndPoint);
this.socket.SendTo(cacheBytes, 0, 12, SocketFlags.None, remoteEndPoint);
}
public void Update(uint timeNow)
......@@ -148,7 +144,7 @@ namespace ETModel
Connect(timeNow);
return;
}
// 超时断开连接
if (timeNow - this.lastRecvTime > 20 * 1000)
{
......@@ -156,6 +152,7 @@ namespace ETModel
return;
}
this.kcp.Update(timeNow);
uint nextUpdateTime = this.kcp.Check(timeNow);
this.GetService().AddToNextTimeUpdate(nextUpdateTime, this.Id);
}
......@@ -173,10 +170,9 @@ namespace ETModel
}
}
public void HandleRecv(byte[] date, uint timeNow)
public void HandleRecv(byte[] date, int length, uint timeNow)
{
this.kcp.Input(date);
// 加入update队列
this.kcp.Input(date, 0, length);
this.GetService().AddToUpdate(this.Id);
while (true)
......@@ -187,7 +183,7 @@ namespace ETModel
this.OnError((int)SocketError.NetworkReset);
return;
}
int count = this.kcp.Recv(this.cacheBytes);
int count = this.kcp.Recv(this.cacheBytes, 0, this.cacheBytes.Length);
if (count <= 0)
{
return;
......@@ -195,27 +191,20 @@ namespace ETModel
lastRecvTime = timeNow;
// 收到的数据放入缓冲区
byte[] sizeBuffer = BitConverter.GetBytes((ushort)count);
this.recvBuffer.Write(sizeBuffer, 0, sizeBuffer.Length);
this.recvBuffer.Write(cacheBytes, 0, count);
while (true)
{
if (!this.parser.Parse())
{
break;
}
Packet packet = this.parser.GetPacket();
this.OnRead(packet);
}
this.packet.Flag = this.cacheBytes[0];
this.packet.Opcode = BitConverter.ToUInt16(this.cacheBytes, 1);
this.packet.Bytes = this.cacheBytes;
this.packet.Offset = Packet.Index;
this.packet.Length = (ushort) (count - Packet.Index);
this.OnRead(packet);
}
}
public Packet packet = new Packet();
public void Output(byte[] bytes, int count)
public void Output(byte[] bytes, int count, object user)
{
this.socket.Send(bytes, count, this.remoteEndPoint);
this.socket.SendTo(bytes, 0, count, SocketFlags.None, this.remoteEndPoint);
}
private void KcpSend(byte[] buffers, int index, int length)
......@@ -224,6 +213,10 @@ namespace ETModel
this.GetService().AddToUpdate(this.Id);
}
public override void Start()
{
}
public override void Send(byte[] buffer, int index, int length)
{
if (isConnected)
......
......@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace ETModel
{
......@@ -20,11 +19,11 @@ namespace ETModel
public uint TimeNow { get; set; }
private UdpClient socket;
private Socket socket;
private readonly Dictionary<long, KChannel> idChannels = new Dictionary<long, KChannel>();
private TaskCompletionSource<AChannel> acceptTcs;
private readonly byte[] cache = new byte[8192];
private readonly Queue<long> removedChannels = new Queue<long>();
......@@ -35,163 +34,173 @@ namespace ETModel
private readonly MultiMap<long, long> timerId = new MultiMap<long, long>();
private readonly List<long> timeOutId = new List<long>();
private EndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, 0);
public KService(IPEndPoint ipEndPoint)
{
this.TimeNow = (uint)TimeHelper.Now();
this.socket = new UdpClient(ipEndPoint);
this.TimeNow = (uint)TimeHelper.ClientNow();
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.socket.Bind(ipEndPoint);
#if SERVER
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
const uint IOC_IN = 0x80000000;
const uint IOC_VENDOR = 0x18000000;
uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;
this.socket.Client.IOControl((int)SIO_UDP_CONNRESET, new[] { Convert.ToByte(false) }, null);
this.socket.IOControl((int)SIO_UDP_CONNRESET, new[] { Convert.ToByte(false) }, null);
}
#endif
this.StartRecv();
}
public KService()
{
this.TimeNow = (uint)TimeHelper.Now();
this.socket = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
this.StartRecv();
this.TimeNow = (uint)TimeHelper.ClientNow();
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.socket.Bind(new IPEndPoint(IPAddress.Any, 0));
#if SERVER
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
const uint IOC_IN = 0x80000000;
const uint IOC_VENDOR = 0x18000000;
uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;
this.socket.IOControl((int)SIO_UDP_CONNRESET, new[] { Convert.ToByte(false) }, null);
}
#endif
}
public override void Dispose()
{
if (this.socket == null)
if (this.IsDisposed)
{
return;
}
base.Dispose();
foreach (KeyValuePair<long,KChannel> keyValuePair in this.idChannels)
{
keyValuePair.Value.Dispose();
}
this.socket.Close();
this.socket = null;
}
public async void StartRecv()
public override void Start()
{
while (true)
}
public void Recv()
{
while (this.socket.Available > 0)
{
if (this.socket == null)
if (this.IsDisposed)
{
return;
}
UdpReceiveResult udpReceiveResult;
int messageLength = 0;
try
{
udpReceiveResult = await this.socket.ReceiveAsync();
messageLength = this.socket.ReceiveFrom(this.cache, ref this.ipEndPoint);
}
catch (Exception e)
catch (Exception)
{
return;
}
// 长度小于4,不是正常的消息
if (messageLength < 4)
{
Log.Error(e);
continue;
}
try
// accept
uint conn = BitConverter.ToUInt32(this.cache, 0);
// conn从1000开始,如果为1,2,3则是特殊包
switch (conn)
{
int messageLength = udpReceiveResult.Buffer.Length;
// 长度小于4,不是正常的消息
if (messageLength < 4)
{
continue;
}
// accept
uint conn = BitConverter.ToUInt32(udpReceiveResult.Buffer, 0);
// conn从1000开始,如果为1,2,3则是特殊包
switch (conn)
{
case KcpProtocalType.SYN:
// 长度!=8,不是accpet消息
if (messageLength != 8)
{
break;
}
this.HandleAccept(udpReceiveResult);
case KcpProtocalType.SYN:
// 长度!=8,不是accpet消息
if (messageLength != 8)
{
break;
case KcpProtocalType.ACK:
// 长度!=12,不是connect消息
if (messageLength != 12)
{
break;
}
this.HandleConnect(udpReceiveResult);
}
IPEndPoint acceptIpEndPoint = (IPEndPoint)this.ipEndPoint;
this.ipEndPoint = new IPEndPoint(0, 0);
this.HandleAccept(this.cache, acceptIpEndPoint);
break;
case KcpProtocalType.ACK:
// 长度!=12,不是connect消息
if (messageLength != 12)
{
break;
case KcpProtocalType.FIN:
// 长度!=12,不是DisConnect消息
if (messageLength != 12)
{
break;
}
this.HandleDisConnect(udpReceiveResult);
}
this.HandleConnect(this.cache);
break;
case KcpProtocalType.FIN:
// 长度!=12,不是DisConnect消息
if (messageLength != 12)
{
break;
default:
this.HandleRecv(udpReceiveResult, conn);
break;
}
}
catch (Exception e)
{
Log.Error(e);
continue;
}
this.HandleDisConnect(this.cache);
break;
default:
this.HandleRecv(this.cache, messageLength, conn);
break;
}
}
}
private void HandleConnect(UdpReceiveResult udpReceiveResult)
private void HandleConnect(byte[] bytes)
{
uint requestConn = BitConverter.ToUInt32(udpReceiveResult.Buffer, 4);
uint responseConn = BitConverter.ToUInt32(udpReceiveResult.Buffer, 8);
uint requestConn = BitConverter.ToUInt32(bytes, 4);
uint responseConn = BitConverter.ToUInt32(bytes, 8);
KChannel kChannel;
if (!this.idChannels.TryGetValue(requestConn, out kChannel))
{
return;
}
// 处理chanel
kChannel.HandleConnnect(responseConn);
}
private void HandleDisConnect(UdpReceiveResult udpReceiveResult)
private void HandleDisConnect(byte[] bytes)
{
uint requestConn = BitConverter.ToUInt32(udpReceiveResult.Buffer, 8);
uint requestConn = BitConverter.ToUInt32(bytes, 8);
KChannel kChannel;
if (!this.idChannels.TryGetValue(requestConn, out kChannel))
{
return;
}
// 处理chanel
this.idChannels.Remove(requestConn);
kChannel.Dispose();
}
private void HandleRecv(UdpReceiveResult udpReceiveResult, uint conn)
private void HandleRecv(byte[] bytes, int length, uint conn)
{
KChannel kChannel;
if (!this.idChannels.TryGetValue(conn, out kChannel))
{
return;
}
// 处理chanel
kChannel.HandleRecv(udpReceiveResult.Buffer, this.TimeNow);
kChannel.HandleRecv(bytes, length, this.TimeNow);
}
private void HandleAccept(UdpReceiveResult udpReceiveResult)
private void HandleAccept(byte[] bytes, IPEndPoint remoteEndPoint)
{
if (this.acceptTcs == null)
{
return;
}
uint requestConn = BitConverter.ToUInt32(udpReceiveResult.Buffer, 4);
uint requestConn = BitConverter.ToUInt32(bytes, 4);
// 如果已经连接上,则重新响应请求
KChannel kChannel;
......@@ -201,11 +210,9 @@ namespace ETModel
return;
}
TaskCompletionSource<AChannel> t = this.acceptTcs;
this.acceptTcs = null;
kChannel = this.CreateAcceptChannel(udpReceiveResult.RemoteEndPoint, requestConn);
kChannel = this.CreateAcceptChannel(remoteEndPoint, requestConn);
kChannel.HandleAccept(requestConn);
t.SetResult(kChannel);
this.OnAccept(kChannel);
}
private KChannel CreateAcceptChannel(IPEndPoint remoteEndPoint, uint remoteConn)
......@@ -221,7 +228,14 @@ namespace ETModel
return channel;
}
private KChannel CreateConnectChannel(IPEndPoint remoteEndPoint)
public override AChannel GetChannel(long id)
{
KChannel channel;
this.idChannels.TryGetValue(id, out channel);
return channel;
}
public override AChannel ConnectChannel(IPEndPoint remoteEndPoint)
{
uint conv = (uint)RandomHelper.RandomNumber(1000, int.MaxValue);
KChannel channel = new KChannel(conv, this.socket, remoteEndPoint, this);
......@@ -245,26 +259,6 @@ namespace ETModel
this.timerId.Add(time, id);
}
public override AChannel GetChannel(long id)
{
KChannel channel;
this.idChannels.TryGetValue(id, out channel);
return channel;
}
public override Task<AChannel> AcceptChannel()
{
acceptTcs = new TaskCompletionSource<AChannel>();
return this.acceptTcs.Task;
}
public override AChannel ConnectChannel(IPEndPoint ipEndPoint)
{
KChannel channel = this.CreateConnectChannel(ipEndPoint);
return channel;
}
public override void Remove(long id)
{
KChannel channel;
......@@ -279,9 +273,11 @@ namespace ETModel
this.removedChannels.Enqueue(id);
channel.Dispose();
}
public override void Update()
{
this.Recv();
this.TimerOut();
foreach (long id in updateChannels)
......@@ -319,7 +315,7 @@ namespace ETModel
}
this.TimeNow = (uint)TimeHelper.ClientNow();
timeOutId.Clear();
while (this.timerId.Count > 0)
......
......@@ -2,12 +2,12 @@
namespace ETModel
{
internal enum ParserState
public enum ParserState
{
PacketSize,
PacketBody
}
public class Packet
{
public const int MinSize = 3;
......@@ -15,15 +15,20 @@ namespace ETModel
public const int FlagIndex = 0;
public const int OpcodeIndex = 1;
public const int Index = 3;
/// <summary>
/// 只读,不允许修改
/// </summary>
public byte[] Bytes { get; }
public byte[] Bytes { get; set; }
public ushort Offset { get; set; }
public ushort Length { get; set; }
public byte Flag { get; set; }
public ushort Opcode { get; set; }
public Packet()
{
}
public Packet(int length)
{
this.Length = 0;
......@@ -37,12 +42,13 @@ namespace ETModel
}
}
internal class PacketParser
public class PacketParser
{
private readonly CircularBuffer buffer;
private ushort packetSize;
private ParserState state;
private readonly Packet packet = new Packet(ushort.MaxValue);
public readonly Packet packet = new Packet(ushort.MaxValue);
private readonly byte[] cache = new byte[2];
private bool isOK;
public PacketParser(CircularBuffer buffer)
......@@ -70,7 +76,7 @@ namespace ETModel
else
{
this.buffer.Read(this.packet.Bytes, 0, 2);
this.packetSize = BitConverter.ToUInt16(this.packet.Bytes, 0);
packetSize = BitConverter.ToUInt16(this.packet.Bytes, 0);
if (packetSize < Packet.MinSize || packetSize > Packet.MaxSize)
{
throw new Exception($"packet size error: {this.packetSize}");
......@@ -85,10 +91,14 @@ namespace ETModel
}
else
{
this.buffer.Read(this.packet.Bytes, 0, this.packetSize);
this.packet.Length = this.packetSize;
this.packet.Flag = this.packet.Bytes[0];
this.packet.Opcode = BitConverter.ToUInt16(this.packet.Bytes, Packet.OpcodeIndex);
this.buffer.Read(this.cache, 0, 1);
this.packet.Flag = this.cache[0];
this.buffer.Read(this.cache, 0, 2);
this.packet.Opcode = BitConverter.ToUInt16(this.cache, 0);
this.buffer.Read(this.packet.Bytes, 0, this.packetSize - Packet.Index);
this.packet.Length = (ushort) (this.packetSize - Packet.Index);
this.packet.Offset = 0;
this.isOK = true;
this.state = ParserState.PacketSize;
finish = true;
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace ETModel
{
public class TChannel : AChannel
/// <summary>
/// 封装Socket,将回调push到主线程处理
/// </summary>
public sealed class TChannel: AChannel
{
private readonly TcpClient tcpClient;
private Socket socket;
private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
private readonly CircularBuffer recvBuffer = new CircularBuffer();
private readonly CircularBuffer sendBuffer = new CircularBuffer();
private bool isSending;
private readonly PacketParser parser;
private bool isConnected;
/// <summary>
/// connect
/// </summary>
public TChannel(TcpClient tcpClient, IPEndPoint ipEndPoint, TService service) : base(service, ChannelType.Connect)
{
this.tcpClient = tcpClient;
this.parser = new PacketParser(this.recvBuffer);
this.RemoteAddress = ipEndPoint;
private bool isConnected;
this.ConnectAsync(ipEndPoint);
}
public readonly PacketParser parser;
/// <summary>
/// accept
/// </summary>
public TChannel(TcpClient tcpClient, TService service) : base(service, ChannelType.Accept)
public TChannel(IPEndPoint ipEndPoint, TService service): base(service, ChannelType.Connect)
{
this.tcpClient = tcpClient;
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.socket.NoDelay = true;
this.parser = new PacketParser(this.recvBuffer);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.innArgs.UserToken = new UserTokenInfo() { InstanceId = this.InstanceId };
this.outArgs.UserToken = new UserTokenInfo() { InstanceId = this.InstanceId };
IPEndPoint ipEndPoint = (IPEndPoint)this.tcpClient.Client.RemoteEndPoint;
this.RemoteAddress = ipEndPoint;
this.OnAccepted();
}
private async void ConnectAsync(IPEndPoint ipEndPoint)
public TChannel(Socket socket, TService service): base(service, ChannelType.Accept)
{
try
{
await this.tcpClient.ConnectAsync(ipEndPoint.Address, ipEndPoint.Port);
this.isConnected = true;
this.StartSend();
this.StartRecv();
}
catch (SocketException e)
{
Log.Error($"connect error: {e.SocketErrorCode}");
this.OnError((int)e.SocketErrorCode);
}
catch (Exception e)
{
Log.Error($"connect error: {ipEndPoint} {e}");
this.OnError((int)SocketError.SocketError);
}
}
this.socket = socket;
this.socket.NoDelay = true;
this.parser = new PacketParser(this.recvBuffer);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.innArgs.UserToken = new UserTokenInfo() { InstanceId = this.InstanceId };
this.outArgs.UserToken = new UserTokenInfo() { InstanceId = this.InstanceId };
this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint;
this.isConnected = true;
}
public override void Dispose()
{
if (this.IsDisposed)
......@@ -74,12 +60,21 @@ namespace ETModel
}
base.Dispose();
this.tcpClient.Close();
this.socket.Close();
this.innArgs.Dispose();
this.outArgs.Dispose();
this.socket = null;
}
private void OnAccepted()
public override void Start()
{
this.isConnected = true;
if (!this.isConnected)
{
this.ConnectAsync(this.RemoteAddress);
return;
}
this.StartSend();
this.StartRecv();
}
......@@ -90,10 +85,10 @@ namespace ETModel
{
throw new Exception("TChannel已经被Dispose, 不能发送消息");
}
byte[] size = BitConverter.GetBytes((ushort)buffer.Length);
this.sendBuffer.Write(size, 0, size.Length);
byte[] sizeBuffer = BitConverter.GetBytes(length);
this.sendBuffer.Write(sizeBuffer, 0, sizeBuffer.Length);
this.sendBuffer.Write(buffer, index, length);
if (this.isConnected)
if (this.isConnected && !this.isSending)
{
this.StartSend();
}
......@@ -112,115 +107,206 @@ namespace ETModel
{
this.sendBuffer.Write(buffer, 0, buffer.Length);
}
if (this.isConnected)
if (this.isConnected && !this.isSending)
{
this.StartSend();
}
}
private async void StartSend()
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Connect:
OneThreadSynchronizationContext.Instance.Post(this.OnConnectComplete, e);
break;
case SocketAsyncOperation.Receive:
OneThreadSynchronizationContext.Instance.Post(this.OnRecvComplete, e);
break;
case SocketAsyncOperation.Send:
OneThreadSynchronizationContext.Instance.Post(this.OnSendComplete, e);
break;
case SocketAsyncOperation.Disconnect:
OneThreadSynchronizationContext.Instance.Post(this.OnDisconnectComplete, e);
break;
default:
throw new Exception($"socket error: {e.LastOperation}");
}
}
public void ConnectAsync(IPEndPoint ipEndPoint)
{
this.outArgs.RemoteEndPoint = ipEndPoint;
if (this.socket.ConnectAsync(this.outArgs))
{
return;
}
OnConnectComplete(this.outArgs);
}
private void OnConnectComplete(object o)
{
SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
UserTokenInfo userTokenInfo = (UserTokenInfo) e.UserToken;
if (userTokenInfo.InstanceId != this.InstanceId)
{
return;
}
if (e.SocketError != SocketError.Success)
{
this.OnError((int)e.SocketError);
return;
}
this.isConnected = true;
this.StartSend();
this.StartRecv();
}
private void OnDisconnectComplete(object o)
{
SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
this.OnError((int)e.SocketError);
}
private void StartRecv()
{
int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
this.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
}
public void RecvAsync(byte[] buffer, int offset, int count)
{
long instanceId = this.InstanceId;
try
{
// 如果正在发送中,不需要再次发送
if (this.isSending)
this.innArgs.SetBuffer(buffer, offset, count);
}
catch (Exception e)
{
throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
}
if (this.socket.ReceiveAsync(this.innArgs))
{
return;
}
OnRecvComplete(this.innArgs);
}
private void OnRecvComplete(object o)
{
SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
UserTokenInfo userTokenInfo = (UserTokenInfo) e.UserToken;
if (userTokenInfo.InstanceId != this.InstanceId)
{
return;
}
if (e.SocketError != SocketError.Success)
{
this.OnError((int)e.SocketError);
return;
}
if (e.BytesTransferred == 0)
{
this.OnError((int)e.SocketError);
return;
}
this.recvBuffer.LastIndex += e.BytesTransferred;
if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
{
this.recvBuffer.AddLast();
this.recvBuffer.LastIndex = 0;
}
// 收到消息回调
while (true)
{
if (!this.parser.Parse())
{
return;
break;
}
while (true)
Packet packet = this.parser.GetPacket();
try
{
this.OnRead(packet);
}
catch (Exception exception)
{
if (this.InstanceId != instanceId)
{
return;
}
// 没有数据需要发送
long buffLength = this.sendBuffer.Length;
if (buffLength == 0)
{
this.isSending = false;
return;
}
this.isSending = true;
NetworkStream stream = this.tcpClient.GetStream();
if (!stream.CanWrite)
{
return;
}
await this.sendBuffer.WriteToAsync(stream);
Log.Error(exception);
}
}
catch (IOException)
if (userTokenInfo.InstanceId != this.InstanceId)
{
this.OnError((int)SocketError.SocketError);
return;
}
catch (ObjectDisposedException)
this.StartRecv();
}
private void StartSend()
{
// 没有数据需要发送
if (this.sendBuffer.Length == 0)
{
this.OnError((int)SocketError.SocketError);
this.isSending = false;
return;
}
catch (Exception e)
this.isSending = true;
int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
if (sendSize > this.sendBuffer.Length)
{
Log.Error(e);
this.OnError((int)SocketError.SocketError);
sendSize = (int)this.sendBuffer.Length;
}
this.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
}
private async void StartRecv()
public void SendAsync(byte[] buffer, int offset, int count)
{
long instanceId = this.InstanceId;
try
{
while (true)
{
if (this.InstanceId != instanceId)
{
return;
}
NetworkStream stream = this.tcpClient.GetStream();
if (!stream.CanRead)
{
return;
}
int n = await this.recvBuffer.ReadFromAsync(stream);
if (n == 0)
{
this.OnError((int)SocketError.NetworkReset);
return;
}
while (true)
{
if (!this.parser.Parse())
{
break;
}
Packet packet = this.parser.GetPacket();
this.OnRead(packet);
}
}
this.outArgs.SetBuffer(buffer, offset, count);
}
catch (IOException)
catch (Exception e)
{
this.OnError((int)SocketError.SocketError);
throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
}
catch (ObjectDisposedException)
if (this.socket.SendAsync(this.outArgs))
{
this.OnError((int)SocketError.SocketError);
return;
}
catch (Exception e)
OnSendComplete(this.outArgs);
}
private void OnSendComplete(object o)
{
SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
UserTokenInfo userTokenInfo = (UserTokenInfo) e.UserToken;
if (userTokenInfo.InstanceId != this.InstanceId)
{
return;
}
if (e.SocketError != SocketError.Success)
{
Log.Error(e);
this.OnError((int)SocketError.SocketError);
this.OnError((int)e.SocketError);
return;
}
this.sendBuffer.FirstIndex += e.BytesTransferred;
if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
{
this.sendBuffer.FirstIndex = 0;
this.sendBuffer.RemoveFirst();
}
this.StartSend();
}
}
}
\ No newline at end of file
......@@ -7,72 +7,117 @@ using System.Threading.Tasks;
namespace ETModel
{
public sealed class TService: AService
public sealed class TService : AService
{
private TcpListener acceptor;
private readonly Dictionary<long, TChannel> idChannels = new Dictionary<long, TChannel>();
private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
private Socket acceptor;
/// <summary>
/// 即可做client也可做server
/// </summary>
public TService(IPEndPoint ipEndPoint)
{
this.acceptor = new TcpListener(ipEndPoint);
this.acceptor.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
this.acceptor.Server.NoDelay = true;
this.acceptor.Start();
this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.acceptor.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
this.innArgs.Completed += this.OnAcceptComplete;
this.innArgs.UserToken = new UserTokenInfo() { InstanceId = this.InstanceId };
this.acceptor.Bind(ipEndPoint);
this.acceptor.Listen(1000);
}
public TService()
{
}
public override void Dispose()
{
if (this.acceptor == null)
if (this.IsDisposed)
{
return;
}
base.Dispose();
foreach (long id in this.idChannels.Keys.ToArray())
{
TChannel channel = this.idChannels[id];
channel.Dispose();
}
this.acceptor.Stop();
this.acceptor?.Close();
this.acceptor = null;
this.innArgs.Dispose();
}
public override void Start()
{
if (this.acceptor != null)
{
this.AcceptAsync();
}
}
public override AChannel GetChannel(long id)
public void AcceptAsync()
{
TChannel channel = null;
this.idChannels.TryGetValue(id, out channel);
return channel;
this.innArgs.AcceptSocket = null;
if (this.acceptor.AcceptAsync(this.innArgs))
{
return;
}
OnAcceptComplete(this, this.innArgs);
}
public override async Task<AChannel> AcceptChannel()
private void OnAcceptComplete(object sender, SocketAsyncEventArgs o)
{
if (this.acceptor == null)
SocketAsyncEventArgs e = o;
UserTokenInfo userTokenInfo = (UserTokenInfo) e.UserToken;
if (userTokenInfo.InstanceId != this.InstanceId)
{
return;
}
if (e.SocketError != SocketError.Success)
{
throw new Exception("service construct must use host and port param");
Log.Error($"accept error {e.SocketError}");
return;
}
TcpClient tcpClient = await this.acceptor.AcceptTcpClientAsync();
TChannel channel = new TChannel(tcpClient, this);
TChannel channel = new TChannel(e.AcceptSocket, this);
this.idChannels[channel.Id] = channel;
try
{
this.OnAccept(channel);
}
catch (Exception exception)
{
Log.Error(exception);
}
if (userTokenInfo.InstanceId != this.InstanceId)
{
return;
}
this.AcceptAsync();
}
public override AChannel GetChannel(long id)
{
TChannel channel = null;
this.idChannels.TryGetValue(id, out channel);
return channel;
}
public override AChannel ConnectChannel(IPEndPoint ipEndPoint)
{
TcpClient tcpClient = new TcpClient();
TChannel channel = new TChannel(tcpClient, ipEndPoint, this);
TChannel channel = new TChannel(ipEndPoint, this);
this.idChannels[channel.Id] = channel;
return channel;
}
public override void Remove(long id)
{
TChannel channel;
......
......@@ -2,15 +2,14 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace ETModel
{
public abstract class NetworkComponent : Component
{
private AService Service;
public AppType AppType;
private AService Service;
private readonly Dictionary<long, Session> sessions = new Dictionary<long, Session>();
......@@ -20,16 +19,25 @@ namespace ETModel
public void Awake(NetworkProtocol protocol)
{
switch (protocol)
try
{
switch (protocol)
{
case NetworkProtocol.KCP:
this.Service = new KService();
break;
case NetworkProtocol.TCP:
this.Service = new TService();
break;
default:
throw new ArgumentOutOfRangeException();
}
this.Service.AcceptCallback += this.OnAccept;
}
catch (Exception e)
{
case NetworkProtocol.TCP:
this.Service = new TService();
break;
case NetworkProtocol.KCP:
this.Service = new KService();
break;
default:
throw new ArgumentOutOfRangeException();
throw new Exception($"{e}");
}
}
......@@ -39,17 +47,17 @@ namespace ETModel
{
switch (protocol)
{
case NetworkProtocol.TCP:
this.Service = new TService(ipEndPoint);
break;
case NetworkProtocol.KCP:
this.Service = new KService(ipEndPoint);
break;
case NetworkProtocol.TCP:
this.Service = new TService(ipEndPoint);
break;
default:
throw new ArgumentOutOfRangeException();
}
this.StartAccept();
this.Service.AcceptCallback += this.OnAccept;
}
catch (Exception e)
{
......@@ -57,41 +65,15 @@ namespace ETModel
}
}
private async void StartAccept()
public void Start()
{
while (true)
{
if (this.IsDisposed)
{
return;
}
try
{
await this.Accept();
}
catch (Exception e)
{
Log.Error(e);
}
}
this.Service.Start();
}
public virtual async Task<Session> Accept()
public void OnAccept(AChannel channel)
{
AChannel channel = await this.Service.AcceptChannel();
Session session = ComponentFactory.CreateWithId<Session, NetworkComponent, AChannel>(IdGenerater.GenerateId(), this, channel);
session.Parent = this;
channel.ErrorCallback += (c, e) =>
{
session.Error = e;
this.Remove(session.Id);
};
channel.ReadCallback += (packet) => { session.OnRead(packet); };
Session session = ComponentFactory.CreateWithParent<Session, NetworkComponent, AChannel>(this, this, channel);
this.sessions.Add(session.Id, session);
return session;
}
public virtual void Remove(long id)
......@@ -115,29 +97,12 @@ namespace ETModel
/// <summary>
/// 创建一个新Session
/// </summary>
public virtual Session Create(IPEndPoint ipEndPoint)
public Session Create(IPEndPoint ipEndPoint)
{
try
{
AChannel channel = this.Service.ConnectChannel(ipEndPoint);
Session session = ComponentFactory.CreateWithId<Session, NetworkComponent, AChannel>(IdGenerater.GenerateId(), this, channel);
session.Parent = this;
channel.ErrorCallback += (c, e) =>
{
session.Error = e;
this.Remove(session.Id);
};
channel.ReadCallback += (packet) => { session.OnRead(packet); };
this.sessions.Add(session.Id, session);
return session;
}
catch (Exception e)
{
Log.Error(e);
return null;
}
AChannel channel = this.Service.ConnectChannel(ipEndPoint);
Session session = ComponentFactory.CreateWithParent<Session, NetworkComponent, AChannel>(this, this, channel);
this.sessions.Add(session.Id, session);
return session;
}
public void Update()
......
......@@ -211,4 +211,10 @@ namespace ETModel
}
[Message(OuterOpcode.G2C_Test)]
[ProtoContract]
public partial class G2C_Test: IMessage
{
}
}
......@@ -16,5 +16,6 @@ namespace ETModel
public const ushort M2C_Reload = 112;
public const ushort C2R_Ping = 113;
public const ushort R2C_Ping = 114;
public const ushort G2C_Test = 115;
}
}
......@@ -17,6 +17,15 @@ namespace ETModel
}
}
[ObjectSystem]
public class SessionStartSystem : StartSystem<Session>
{
public override void Start(Session self)
{
self.Start();
}
}
public sealed class Session : Entity
{
private static int RpcId { get; set; }
......@@ -34,11 +43,23 @@ namespace ETModel
}
}
public void Awake(NetworkComponent net, AChannel c)
public void Awake(NetworkComponent net, AChannel aChannel)
{
this.Error = 0;
this.channel = c;
this.channel = aChannel;
this.requestCallback.Clear();
channel.ErrorCallback += (c, e) =>
{
this.Error = e;
this.Network.Remove(this.Id);
};
channel.ReadCallback += this.OnRead;
}
public void Start()
{
this.channel?.Start();
}
public override void Dispose()
......@@ -51,10 +72,10 @@ namespace ETModel
long id = this.Id;
base.Dispose();
foreach (Action<IResponse> action in this.requestCallback.Values.ToArray())
{
action.Invoke(new ResponseMessage { Error = this.Error });
action.Invoke(new ResponseMessage { Error = ErrorCode.ERR_SessionDispose });
}
this.Error = 0;
......@@ -95,7 +116,7 @@ namespace ETModel
{
byte flag = packet.Flag;
ushort opcode = packet.Opcode;
#if !SERVER
if (OpcodeHelper.IsClientHotfixMessage(opcode))
{
......@@ -116,7 +137,7 @@ namespace ETModel
{
OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(opcode);
message = this.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
message = this.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, packet.Offset, packet.Length);
//Log.Debug($"recv: {JsonHelper.ToJson(message)}");
}
catch (Exception e)
......@@ -238,27 +259,21 @@ namespace ETModel
if (this.Network.AppType == AppType.AllServer)
{
Session session = this.Network.Entity.GetComponent<NetInnerComponent>().Get(this.RemoteAddress);
this.pkt.Length = 0;
ushort index = 0;
foreach (var byts in byteses)
{
Array.Copy(byts, 0, this.pkt.Bytes, index, byts.Length);
index += (ushort)byts.Length;
}
this.pkt.Length = index;
this.pkt.Flag = flag;
this.pkt.Opcode = opcode;
session.Run(this.pkt);
Packet packet = ((TChannel)this.channel).parser.packet;
Array.Copy(bytes, 0, packet.Bytes, 0, bytes.Length);
packet.Offset = 0;
packet.Length = (ushort)bytes.Length;
packet.Flag = flag;
packet.Opcode = opcode;
session.Run(packet);
return;
}
#endif
channel.Send(this.byteses);
}
#if SERVER
private readonly Packet pkt = new Packet(ushort.MaxValue);
#endif
}
}
\ No newline at end of file
......@@ -8,6 +8,11 @@ namespace ETHotfix
{
Component component = Game.ObjectPool.Fetch(type);
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -16,6 +21,11 @@ namespace ETHotfix
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -24,6 +34,11 @@ namespace ETHotfix
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a);
return component;
}
......@@ -32,6 +47,11 @@ namespace ETHotfix
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b);
return component;
}
......@@ -40,6 +60,11 @@ namespace ETHotfix
{
T component = Game.ObjectPool.Fetch<T>();
component.Parent = parent;
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b, c);
return component;
}
......@@ -47,6 +72,11 @@ namespace ETHotfix
public static T Create<T>() where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component);
return component;
}
......@@ -54,6 +84,11 @@ namespace ETHotfix
public static T Create<T, A>(A a) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a);
return component;
}
......@@ -61,6 +96,11 @@ namespace ETHotfix
public static T Create<T, A, B>(A a, B b) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b);
return component;
}
......@@ -68,6 +108,11 @@ namespace ETHotfix
public static T Create<T, A, B, C>(A a, B b, C c) where T : Component
{
T component = Game.ObjectPool.Fetch<T>();
ComponentWithId componentWithId = component as ComponentWithId;
if (componentWithId != null)
{
componentWithId.Id = component.InstanceId;
}
Game.EventSystem.Awake(component, a, b, c);
return component;
}
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ETModel;
......@@ -17,7 +18,7 @@ namespace ETHotfix
sessionComponent.DisposeCallback = s => { self.Dispose(); };
}
}
/// <summary>
/// 用来收发热更层的消息
/// </summary>
......@@ -36,18 +37,25 @@ namespace ETHotfix
}
base.Dispose();
foreach (Action<IResponse> action in this.requestCallback.Values.ToArray())
{
action.Invoke(new ResponseMessage { Error = ErrorCode.ERR_SessionDispose });
}
this.requestCallback.Clear();
this.session.Dispose();
}
public void Run(ETModel.Session s, Packet p)
public void Run(ETModel.Session s, Packet packet)
{
ushort opcode = p.Opcode;
byte flag = p.Flag;
ushort opcode = packet.Opcode;
byte flag = packet.Flag;
OpcodeTypeComponent opcodeTypeComponent = Game.Scene.GetComponent<OpcodeTypeComponent>();
Type responseType = opcodeTypeComponent.GetType(opcode);
object message = ProtobufHelper.FromBytes(responseType, p.Bytes, Packet.Index, p.Length - Packet.Index);
object message = ProtobufHelper.FromBytes(responseType, packet.Bytes, packet.Offset, packet.Length);
if ((flag & 0x01) > 0)
{
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册