Skip to main content

心跳设计

一、说明

1.1 为什么要设置心跳?

心跳机制一般是客户端服务器定时发送一个特定的数据包,让服务器知道自己还在线,以确保连接的有效性的机制。 网络中的接收和发送数据都是使用操作系统中的 SOCKET 进行实现。 但是如果此 套接字 已经断开,那发送数据和接收数据的时候就一定会有问题。 可是如何判断这个套接字是否还可以使用呢? 这个就需要在系统中创建心跳机制。

其实TCP中已经为我们实现了一个内置心跳机制(SetKeepAliveValue)。但是该机制受限于操作系统,而且很容易误报。所以很少被大家使用。

大家使用最多的,就是自己设计数据包,然后预留心跳格式,当对方收到心跳包时,直接返回响应包即可。

那么,按这个思路,让我们使用优雅的实现吧。

二、设计数据格式

使用心跳之前,必须要明确数据格式,绝对不能混淆业务数据。一般在适配Plc等现成模块时,他们是有固定的数据格式,这时候你可以参阅数据处理适配器,快速的解析数据。

但是在本文中,并没有规定的格式,所以我们需要先设计一种简单高效的数据格式。

如下:

数据长度数据类型载荷数据
2字节(Ushort)1字节(Byte)n字节(<65535)

2.1 解析数据格式

下列代码主要实现对上述数据格式的解析

internal class MyFixedHeaderDataHandlingAdapter : CustomFixedHeaderDataHandlingAdapter<MyRequestInfo>
{
public override int HeaderLength => 3;

public override bool CanSendRequestInfo => false;

protected override MyRequestInfo GetInstance()
{
return new MyRequestInfo();
}

protected override void PreviewSend(IRequestInfo requestInfo)
{
throw new NotImplementedException();
}
}

internal class MyRequestInfo : IFixedHeaderRequestInfo
{
public DataType DataType { get; set; }
public byte[] Data { get; set; }

public int BodyLength { get; private set; }

public bool OnParsingBody(byte[] body)
{
if (body.Length == this.BodyLength)
{
this.Data = body;
return true;
}
return false;
}

public bool OnParsingHeader(byte[] header)
{
if (header.Length == 3)
{
this.BodyLength = TouchSocketBitConverter.Default.ToUInt16(header, 0) - 1;
this.DataType = (DataType)header[2];
return true;
}
return false;
}

public void Package(ByteBlock byteBlock)
{
byteBlock.Write((ushort)((this.Data == null ? 0 : this.Data.Length) + 1));
byteBlock.Write((byte)this.DataType);
if (Data != null)
{
byteBlock.Write(Data);
}
}

public byte[] PackageAsBytes()
{
using ByteBlock byteBlock = new ByteBlock();
this.Package(byteBlock);
return byteBlock.ToArray();
}

public override string ToString()
{
return $"数据类型={this.DataType},数据={(this.Data == null ? "null" : Encoding.UTF8.GetString(this.Data))}";
}
}

internal enum DataType : byte
{
Ping,
Pong,
Data
}

三、创建扩展类

下列代码可选,主要实现对Client增加Ping的扩展方法。方便调用。

/// <summary>
/// 一个心跳计数器扩展。
/// </summary>
internal static class DependencyExtensions
{
public static readonly DependencyProperty<Timer> HeartbeatTimerProperty =
DependencyProperty<Timer>.Register("HeartbeatTimer", typeof(DependencyExtensions), null);

public static bool Ping<TClient>(this TClient client) where TClient : ITcpClientBase
{
try
{
client.Send(new MyRequestInfo() { DataType = DataType.Ping }.PackageAsBytes());
return true;
}
catch (Exception ex)
{
client.Logger.Exception(ex);
}

return false;
}

public static bool Pong<TClient>(this TClient client) where TClient : ITcpClientBase
{
try
{
client.Send(new MyRequestInfo() { DataType = DataType.Pong }.PackageAsBytes());
return true;
}
catch (Exception ex)
{
client.Logger.Exception(ex);
}

return false;
}
}

四、创建心跳插件类

下列代码主要实现心跳插件的功能。默认每五秒自动触发一次。且接收方收到Ping后,直接会回复Pong。

internal class HeartbeatAndReceivePlugin : TcpPluginBase
{
private readonly int m_timeTick;
private readonly ILog logger;

[DependencyInject(1000 * 5)]
public HeartbeatAndReceivePlugin(int timeTick, ILog logger)
{
this.m_timeTick = timeTick;
this.logger = logger;
}

protected override void OnConnected(ITcpClientBase client, TouchSocketEventArgs e)
{
if (client is ISocketClient)
{
return;//此处可判断,如果为服务器,则不用使用心跳。
}

if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
{
timer.Dispose();
}

client.SetValue(DependencyExtensions.HeartbeatTimerProperty, new Timer((o) =>
{
client.Ping();
}, null, 0, m_timeTick));

base.OnConnected(client, e);
}

protected override void OnDisconnected(ITcpClientBase client, ClientDisconnectedEventArgs e)
{
base.OnDisconnected(client, e);
if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
{
timer.Dispose();
client.SetValue(DependencyExtensions.HeartbeatTimerProperty, null);
}
}

protected override void OnReceivedData(ITcpClientBase client, ReceivedDataEventArgs e)
{
if (e.RequestInfo is MyRequestInfo myRequest)
{
this.logger.Info(myRequest.ToString());
if (myRequest.DataType == DataType.Ping)
{
client.Pong();
}
}
base.OnReceivedData(client, e);
}
}

五、测试、启动

/// <summary>
/// 示例心跳。
/// 博客地址<see href="https://blog.csdn.net/qq_40374647/article/details/125598921"/>
/// </summary>
/// <param name="args"></param>
private static void Main(string[] args)
{
ConsoleAction consoleAction = new ConsoleAction();

//服务器
TcpService service = new TcpService();
service.Setup(new TouchSocketConfig()//载入配置
.SetListenIPHosts(new IPHost[] { new IPHost("127.0.0.1:7789"), new IPHost(7790) })//同时监听两个地址
.UsePlugin()
.SetDataHandlingAdapter(()=>new MyFixedHeaderDataHandlingAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<HeartbeatAndReceivePlugin>();
}))
.Start();//启动
service.Logger.Info("服务器成功启动");

//客户端
TcpClient tcpClient = new TcpClient();
tcpClient.Setup(new TouchSocketConfig()
.SetRemoteIPHost(new IPHost("127.0.0.1:7789"))
.UsePlugin()
.SetDataHandlingAdapter(() => new MyFixedHeaderDataHandlingAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<HeartbeatAndReceivePlugin>();
}));
tcpClient.Connect();
tcpClient.Logger.Info("客户端成功连接");

consoleAction.OnException += ConsoleAction_OnException;
consoleAction.Add("1", "发送心跳", () =>
{
tcpClient.Ping();
});
consoleAction.Add("2", "发送数据", () =>
{
tcpClient.Send(new MyRequestInfo()
{
DataType = DataType.Data,
Data = Encoding.UTF8.GetBytes(Console.ReadLine())
}
.PackageAsBytes());
});
consoleAction.ShowAll();
while (true)
{
consoleAction.Run(Console.ReadLine());
}
}

private static void ConsoleAction_OnException(Exception obj)
{
Console.WriteLine(obj);
}