未验证 提交 b98417ab 编写于 作者: G Günther Foidl 提交者: GitHub

Avoid some full memory barriers (#1590)

* Avoid some full memory barriers

* Reset in stats itself

* Updated MqttClientStatistics

* Fixed order of members as artifact from merge-conflict
上级 ead58dd6
......@@ -33,8 +33,7 @@ namespace MQTTnet.Adapter
readonly AsyncLock _syncRoot = new AsyncLock();
long _bytesReceived;
long _bytesSent;
Statistics _statistics; // mutable struct, don't make readonly!
public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, MqttPacketInspector packetInspector, IMqttNetLogger logger)
{
......@@ -51,9 +50,9 @@ namespace MQTTnet.Adapter
_logger = logger.WithSource(nameof(MqttChannelAdapter));
}
public long BytesReceived => Interlocked.Read(ref _bytesReceived);
public long BytesReceived => Volatile.Read(ref _statistics._bytesReceived);
public long BytesSent => Interlocked.Read(ref _bytesSent);
public long BytesSent => Volatile.Read(ref _statistics._bytesSent);
public X509Certificate2 ClientCertificate => _channel.ClientCertificate;
......@@ -147,7 +146,7 @@ namespace MQTTnet.Adapter
_packetInspector?.EndReceivePacket();
Interlocked.Add(ref _bytesSent, receivedPacket.TotalLength);
Interlocked.Add(ref _statistics._bytesSent, receivedPacket.TotalLength);
if (PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.Unknown)
{
......@@ -181,11 +180,7 @@ namespace MQTTnet.Adapter
return null;
}
public void ResetStatistics()
{
Interlocked.Exchange(ref _bytesReceived, 0L);
Interlocked.Exchange(ref _bytesSent, 0L);
}
public void ResetStatistics() => _statistics.Reset();
public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
{
......@@ -216,7 +211,7 @@ namespace MQTTnet.Adapter
await _channel.WriteAsync(packetBuffer.Packet, true, cancellationToken).ConfigureAwait(false);
}
Interlocked.Add(ref _bytesReceived, packetBuffer.Length);
Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length);
}
catch (Exception exception)
{
......@@ -459,5 +454,17 @@ namespace MQTTnet.Adapter
throw new MqttCommunicationException(exception);
}
private struct Statistics
{
public long _bytesReceived;
public long _bytesSent;
public void Reset()
{
Volatile.Write(ref _bytesReceived, 0);
Volatile.Write(ref _bytesSent, 0);
}
}
}
}
\ No newline at end of file
......@@ -22,7 +22,11 @@ namespace MQTTnet.Internal
public void Clear()
{
#if NETCOREAPP3_1_OR_GREATER
_queue.Clear();
#else
Interlocked.Exchange(ref _queue, new ConcurrentQueue<TItem>());
#endif
}
public void Dispose()
......@@ -32,6 +36,16 @@ namespace MQTTnet.Internal
_signal.Dispose();
_isDisposed = true;
#if !NETSTANDARD1_3
if (typeof(IDisposable).IsAssignableFrom(typeof(TItem)))
{
while (_queue.TryDequeue(out TItem item))
{
(item as IDisposable).Dispose();
}
}
#endif
}
}
......
......@@ -89,4 +89,4 @@
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="6.2.10" />
</ItemGroup>
</Project>
</Project>
\ No newline at end of file
......@@ -10,14 +10,14 @@ namespace MQTTnet.Server
{
public sealed class MqttClientStatistics
{
long _receivedApplicationMessagesCount;
// Start with 1 because the CONNACK packet is not counted here.
long _receivedPacketsCount = 1;
long _sentApplicationMessagesCount;
Statistics _statistics = new Statistics // mutable struct, don't make readonly!
{
// Start with 1 because the CONNACK packet is not counted here.
_receivedPacketsCount = 1,
// Start with 1 because the CONNECT packet is not counted here.
long _sentPacketsCount = 1;
// Start with 1 because the CONNECT packet is not counted here.
_sentPacketsCount = 1
};
public MqttClientStatistics()
{
......@@ -30,7 +30,6 @@ namespace MQTTnet.Server
}
public DateTime ConnectedTimestamp { get; }
public DateTime LastNonKeepAlivePacketReceivedTimestamp { get; private set; }
/// <summary>
......@@ -43,13 +42,13 @@ namespace MQTTnet.Server
/// </summary>
public DateTime LastPacketSentTimestamp { get; private set; }
public long ReceivedApplicationMessagesCount => Interlocked.Read(ref _receivedApplicationMessagesCount);
public long SentApplicationMessagesCount => Volatile.Read(ref _statistics._sentApplicationMessagesCount);
public long ReceivedPacketsCount => Interlocked.Read(ref _receivedPacketsCount);
public long ReceivedApplicationMessagesCount => Volatile.Read(ref _statistics._receivedApplicationMessagesCount);
public long SentApplicationMessagesCount => Interlocked.Read(ref _sentApplicationMessagesCount);
public long SentPacketsCount => Volatile.Read(ref _statistics._sentPacketsCount);
public long SentPacketsCount => Interlocked.Read(ref _sentPacketsCount);
public long ReceivedPacketsCount => Volatile.Read(ref _statistics._receivedPacketsCount);
public void HandleReceivedPacket(MqttPacket packet)
{
......@@ -61,11 +60,11 @@ namespace MQTTnet.Server
// This class is tracking all values from Clients perspective!
LastPacketSentTimestamp = DateTime.UtcNow;
Interlocked.Increment(ref _sentPacketsCount);
Interlocked.Increment(ref _statistics._sentPacketsCount);
if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _sentApplicationMessagesCount);
Interlocked.Increment(ref _statistics._sentApplicationMessagesCount);
}
if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket))
......@@ -73,6 +72,7 @@ namespace MQTTnet.Server
LastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}
}
public void ResetStatistics() => _statistics.Reset();
public void HandleSentPacket(MqttPacket packet)
{
......@@ -84,20 +84,29 @@ namespace MQTTnet.Server
// This class is tracking all values from Clients perspective!
LastPacketReceivedTimestamp = DateTime.UtcNow;
Interlocked.Increment(ref _receivedPacketsCount);
Interlocked.Increment(ref _statistics._receivedPacketsCount);
if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _receivedApplicationMessagesCount);
Interlocked.Increment(ref _statistics._receivedApplicationMessagesCount);
}
}
public void ResetStatistics()
private struct Statistics
{
Interlocked.Exchange(ref _sentApplicationMessagesCount, 0);
Interlocked.Exchange(ref _receivedApplicationMessagesCount, 0);
Interlocked.Exchange(ref _sentPacketsCount, 0);
Interlocked.Exchange(ref _receivedPacketsCount, 0);
public long _receivedPacketsCount;
public long _sentPacketsCount;
public long _receivedApplicationMessagesCount;
public long _sentApplicationMessagesCount;
public void Reset()
{
Volatile.Write(ref _receivedPacketsCount, 0);
Volatile.Write(ref _sentPacketsCount, 0);
Volatile.Write(ref _receivedApplicationMessagesCount, 0);
Volatile.Write(ref _sentApplicationMessagesCount, 0);
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册