未验证 提交 b08d11c0 编写于 作者: H henry 提交者: GitHub

replace lock/semaphore by readerwriterlockslim to avoid lock contenti… (#1716)

* replace lock/semaphore by readerwriterlockslim to avoid lock contention while dispatch messages

* Update MqttClientSubscriptionsManager.cs

removed comment out codes

* Update ReleaseNotes.md

---------
Co-authored-by: NZeheng Feng <hf944813@broadcom.com>
Co-authored-by: NChristian <6939810+chkr1011@users.noreply.github.com>
上级 ac41e838
* [Core] Add validation of maximum string lengths (#1718).
\ No newline at end of file
* [Core] Add validation of maximum string lengths (#1718).
* [Server] Improved performance by changing internal locking strategy for subscriptions (#1716, thanks to @zeheng).
......@@ -36,7 +36,7 @@ namespace MQTTnet.Server
// See the MqttSubscription object for a detailed explanation.
readonly Dictionary<string, MqttSession> _sessions = new Dictionary<string, MqttSession>(4096);
readonly object _sessionsManagementLock = new object();
readonly ReaderWriterLockSlim _sessionsManagementLock = new ReaderWriterLockSlim();
readonly HashSet<MqttSession> _subscriberSessions = new HashSet<MqttSession>();
public MqttClientSessionsManager(
......@@ -86,7 +86,8 @@ namespace MQTTnet.Server
MqttSession session;
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterWriteLock();
try
{
_sessions.TryGetValue(clientId, out session);
_sessions.Remove(clientId);
......@@ -96,6 +97,10 @@ namespace MQTTnet.Server
_subscriberSessions.Remove(session);
}
}
finally
{
_sessionsManagementLock.ExitWriteLock();
}
try
{
......@@ -173,10 +178,15 @@ namespace MQTTnet.Server
}
List<MqttSession> subscriberSessions;
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterReadLock();
try
{
subscriberSessions = _subscriberSessions.ToList();
}
finally
{
_sessionsManagementLock.ExitReadLock();
}
// Calculate application message topic hash once for subscription checks
MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _);
......@@ -256,13 +266,23 @@ namespace MQTTnet.Server
{
_createConnectionSyncRoot.Dispose();
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterWriteLock();
try
{
foreach (var sessionItem in _sessions)
{
sessionItem.Value.Dispose();
}
}
finally
{
_sessionsManagementLock.ExitWriteLock();
}
if (_sessionsManagementLock != null)
{
_sessionsManagementLock.Dispose();
}
}
public MqttClient GetClient(string id)
......@@ -310,7 +330,8 @@ namespace MQTTnet.Server
{
var result = new List<MqttSessionStatus>();
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterReadLock();
try
{
foreach (var sessionItem in _sessions)
{
......@@ -318,6 +339,10 @@ namespace MQTTnet.Server
result.Add(sessionStatus);
}
}
finally
{
_sessionsManagementLock.ExitReadLock();
}
return Task.FromResult((IList<MqttSessionStatus>)result);
}
......@@ -415,7 +440,8 @@ namespace MQTTnet.Server
public void OnSubscriptionsAdded(MqttSession clientSession, List<string> topics)
{
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterWriteLock();
try
{
if (!clientSession.HasSubscribedTopics)
{
......@@ -428,11 +454,16 @@ namespace MQTTnet.Server
clientSession.AddSubscribedTopic(topic);
}
}
finally
{
_sessionsManagementLock.ExitWriteLock();
}
}
public void OnSubscriptionsRemoved(MqttSession clientSession, List<string> subscriptionTopics)
{
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterWriteLock();
try
{
foreach (var subscriptionTopic in subscriptionTopics)
{
......@@ -445,6 +476,10 @@ namespace MQTTnet.Server
_subscriberSessions.Remove(clientSession);
}
}
finally
{
_sessionsManagementLock.ExitWriteLock();
}
}
public void Start()
......@@ -545,7 +580,8 @@ namespace MQTTnet.Server
MqttSession oldSession;
MqttClient oldClient;
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterWriteLock();
try
{
MqttSession session;
......@@ -590,6 +626,10 @@ namespace MQTTnet.Server
_clients[connectPacket.ClientId] = client;
}
finally
{
_sessionsManagementLock.ExitWriteLock();
}
if (!connAckPacket.IsSessionPresent)
{
......@@ -636,7 +676,8 @@ namespace MQTTnet.Server
MqttSession GetClientSession(string clientId)
{
lock (_sessionsManagementLock)
_sessionsManagementLock.EnterReadLock();
try
{
if (!_sessions.TryGetValue(clientId, out var session))
{
......@@ -645,6 +686,10 @@ namespace MQTTnet.Server
return session;
}
finally
{
_sessionsManagementLock.ExitReadLock();
}
}
async Task<MqttConnectPacket> ReceiveConnectPacket(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
......@@ -695,4 +740,4 @@ namespace MQTTnet.Server
return eventArgs;
}
}
}
\ No newline at end of file
}
......@@ -31,7 +31,7 @@ namespace MQTTnet.Server
readonly Dictionary<string, MqttSubscription> _subscriptions = new Dictionary<string, MqttSubscription>();
// Use subscription lock to maintain consistency across subscriptions and topic hash dictionaries
readonly AsyncLock _subscriptionsLock = new AsyncLock();
readonly ReaderWriterLockSlim _subscriptionsLock = new ReaderWriterLockSlim();
readonly Dictionary<ulong, TopicHashMaskSubscriptions> _wildcardSubscriptionsByTopicHash = new Dictionary<ulong, TopicHashMaskSubscriptions>();
public MqttClientSubscriptionsManager(
......@@ -51,7 +51,8 @@ namespace MQTTnet.Server
var possibleSubscriptions = new List<MqttSubscription>();
// Check for possible subscriptions. They might have collisions but this is fine.
using (_subscriptionsLock.EnterAsync(CancellationToken.None).GetAwaiter().GetResult())
_subscriptionsLock.EnterReadLock();
try
{
if (_noWildcardSubscriptionsByTopicHash.TryGetValue(topicHash, out var noWildcardSubscriptions))
{
......@@ -73,6 +74,10 @@ namespace MQTTnet.Server
}
}
}
finally
{
_subscriptionsLock.ExitReadLock();
}
// The pre check has evaluated that nothing is subscribed.
// If there were some possible candidates they get checked below
......@@ -157,7 +162,10 @@ namespace MQTTnet.Server
public void Dispose()
{
_subscriptionsLock.Dispose();
if (_subscriptionsLock != null)
{
_subscriptionsLock.Dispose();
}
}
public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
......@@ -232,7 +240,8 @@ namespace MQTTnet.Server
var removedSubscriptions = new List<string>();
using (await _subscriptionsLock.EnterAsync(cancellationToken).ConfigureAwait(false))
_subscriptionsLock.EnterWriteLock();
try
{
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
......@@ -293,8 +302,11 @@ namespace MQTTnet.Server
}
}
}
_subscriptionChangedNotification?.OnSubscriptionsRemoved(_session, removedSubscriptions);
finally
{
_subscriptionsLock.ExitWriteLock();
_subscriptionChangedNotification?.OnSubscriptionsRemoved(_session, removedSubscriptions);
}
if (_eventContainer.ClientUnsubscribedTopicEvent.HasHandlers)
{
......@@ -341,7 +353,8 @@ namespace MQTTnet.Server
// Add to subscriptions and maintain topic hash dictionaries
using (_subscriptionsLock.EnterAsync(CancellationToken.None).GetAwaiter().GetResult())
_subscriptionsLock.EnterWriteLock();
try
{
MqttTopicHash.Calculate(topicFilter.Topic, out var topicHash, out _, out var hasWildcard);
......@@ -391,6 +404,10 @@ namespace MQTTnet.Server
subscriptions.Add(subscription);
}
}
finally
{
_subscriptionsLock.ExitWriteLock();
}
return new CreateSubscriptionResult
{
......@@ -511,4 +528,4 @@ namespace MQTTnet.Server
public MqttSubscription Subscription { get; set; }
}
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册