未验证 提交 8b3b9d33 编写于 作者: C Christian 提交者: GitHub

Add overloads for options builder (#1711)

* Add new overload for options builder

* Refactor code

* Update ReleaseNotes.md
上级 59d99509
......@@ -3,8 +3,9 @@
* [Client] Added support for passing MQTT v5 options (User properties etc.) for disconnects.
* [Client] An internal exception is no longer caught silently when calling _DisconnectAsync_ to indicate that the disconnect is not clean (BREAKING CHANGE).
* [Client] MQTTv5 features are now checked and an exception is thrown if they are used when using protocol version 3.1.1 and lower. These checks can be disabled in client options. (BREAKING CHANGE!).
* [ManagedClient] Exposed added and removed topics in _ManagedProcessFailedEventArgs_ (#1678, thanks to @scottbrogden-iheartmedia).
* [Client] Added support for disabling packet fragmentation (required for i.e. AWS, #1690, thanks to @logicaloud).
* [Client] Added new overloads for options builder.
* [ManagedClient] Exposed added and removed topics in _ManagedProcessFailedEventArgs_ (#1678, thanks to @scottbrogden-iheartmedia).
* [Server] Exposed MQTT v5 sent properties from the affected client in _ClientDisconnectedAsync_ event.
* [Server] Fixed wrong client ID passed to _InterceptingUnsubscriptionEventArgs_ (#1631, thanks to @ghord).
* [Server] Exposed socket settings for TCP keep alive in TCP options (#1544).
......
......@@ -2,38 +2,34 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
namespace MQTTnet.AspNetCore
{
public sealed class MqttWebSocketServerAdapter : IMqttServerAdapter
{
IMqttNetLogger _logger = new MqttNetNullLogger();
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
IMqttNetLogger _logger = MqttNetNullLogger.Instance;
public Task StartAsync(MqttServerOptions options, IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
return Task.CompletedTask;
}
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
public Task StopAsync()
public void Dispose()
{
return Task.CompletedTask;
}
public async Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext httpContext)
{
if (webSocket == null) throw new ArgumentNullException(nameof(webSocket));
if (webSocket == null)
{
throw new ArgumentNullException(nameof(webSocket));
}
var endpoint = $"{httpContext.Connection.RemoteIpAddress}:{httpContext.Connection.RemotePort}";
......@@ -60,8 +56,15 @@ namespace MQTTnet.AspNetCore
}
}
public void Dispose()
public Task StartAsync(MqttServerOptions options, IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
return Task.CompletedTask;
}
public Task StopAsync()
{
return Task.CompletedTask;
}
}
}
\ No newline at end of file
......@@ -228,7 +228,7 @@ namespace MQTTnet.Tests.Server
CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel applicationMessageQoSLevel, string senderClientId)
{
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out _, out _);
MqttTopicHash.Calculate(topic, out var topicHash, out _, out _);
return _subscriptionsManager.CheckSubscriptions(topic, topicHash, applicationMessageQoSLevel, senderClientId);
}
}
......
......@@ -47,7 +47,7 @@ namespace MQTTnet.Tests.Server
var topics = t.Value;
foreach (var topic in topics)
{
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var hashMask, out var hasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var hashMask, out var hasWildcard);
bucketDepths.TryGetValue(topicHash, out var currentValue);
++currentValue;
......@@ -96,7 +96,7 @@ namespace MQTTnet.Tests.Server
{
var topic = "asdfasdf/asdfasdf/asdfasdf/asdfasdf/asdfas/dfaf/assfdgsdfgdf/#";
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
}
......@@ -110,7 +110,7 @@ namespace MQTTnet.Tests.Server
{
var topic = "asdfasdf/asdfasdf/asdfasdf/asdfasdf/asdfas/dfaf/assfdgsdfgdf/+";
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
}
......@@ -150,7 +150,7 @@ namespace MQTTnet.Tests.Server
// UInt64 is limited to 8 levels
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
......@@ -212,7 +212,7 @@ namespace MQTTnet.Tests.Server
var topic = sb.ToString();
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsFalse(topicHasWildcard, "Wildcard detected when not present");
......@@ -269,7 +269,7 @@ namespace MQTTnet.Tests.Server
var topic = sb.ToString();
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
......@@ -319,7 +319,7 @@ namespace MQTTnet.Tests.Server
var l3 = "#";
var topic = $"{l0}/{l1}/{l2}/{l3}";
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
......@@ -354,7 +354,7 @@ namespace MQTTnet.Tests.Server
var l3 = "prop1";
var topic = $"{l0}/{l1}/{l2}/{l3}";
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsFalse(topicHasWildcard, "Wildcard detected when not wildcard present");
......@@ -447,7 +447,7 @@ namespace MQTTnet.Tests.Server
var l3 = "prop1";
var topic = $"{l0}/{l1}/{l2}/{l3}";
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var topicHashMask, out var topicHasWildcard);
Assert.IsTrue(topicHasWildcard, "Wildcard not detected");
......@@ -475,7 +475,7 @@ namespace MQTTnet.Tests.Server
void CheckTopicHash(string topic, ulong expectedHash, ulong expectedHashMask)
{
MqttSubscription.CalculateTopicHash(topic, out var topicHash, out var hashMask, out var hasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out var hashMask, out var hasWildcard);
Console.WriteLine();
Console.WriteLine("Topic: " + topic);
......@@ -501,7 +501,7 @@ namespace MQTTnet.Tests.Server
{
var topicToFind = topicsToFind[countDown];
MqttSubscription.CalculateTopicHash(topicToFind, out var topicHash, out _, out _);
MqttTopicHash.Calculate(topicToFind, out var topicHash, out _, out _);
clientSession.TryCheckSubscriptions(topicToFind, topicHash, MqttQualityOfServiceLevel.AtMostOnce, "OtherClient", out var result);
if (result.IsSubscribed)
......
......@@ -150,18 +150,8 @@ namespace MQTTnet.Tests
{
Assert.AreEqual(expectedResult, MqttTopicFilterComparer.Compare(topic, filter));
ulong topicHash;
ulong topicHashMask;
bool topicHasWildcard;
MqttSubscription.CalculateTopicHash(topic, out topicHash, out topicHashMask, out topicHasWildcard);
ulong filterTopicHash;
ulong filterTopicHashMask;
bool filterTopicHasWildcard;
MqttSubscription.CalculateTopicHash(filter, out filterTopicHash, out filterTopicHashMask, out filterTopicHasWildcard);
MqttTopicHash.Calculate(topic, out var topicHash, out _, out _);
MqttTopicHash.Calculate(filter, out var filterTopicHash, out var filterTopicHashMask, out _);
if (expectedResult == MqttTopicFilterCompareResult.IsMatch)
{
......
......@@ -488,9 +488,9 @@ namespace MQTTnet.Client
using (var effectiveCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken))
{
_logger.Verbose("Trying to connect with server '{0}'.", Options.ChannelOptions);
_logger.Verbose("Trying to connect with server '{0}'", Options.ChannelOptions);
await _adapter.ConnectAsync(effectiveCancellationToken.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");
_logger.Verbose("Connection with server established");
_publishPacketReceiverQueue?.Dispose();
_publishPacketReceiverQueue = new AsyncQueue<MqttPublishPacket>();
......
......@@ -79,18 +79,6 @@ namespace MQTTnet.Client
return _options;
}
/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _true_.
/// </summary>
public MqttClientOptionsBuilder WithoutPacketFragmentation()
{
_options.AllowPacketFragmentation = false;
return this;
}
public MqttClientOptionsBuilder WithAuthentication(string method, byte[] data)
{
_options.AuthenticationMethod = method;
......@@ -98,12 +86,24 @@ namespace MQTTnet.Client
return this;
}
/// <summary>
/// Clean session is used in MQTT versions below 5.0.0. It is the same as setting "CleanStart".
/// </summary>
public MqttClientOptionsBuilder WithCleanSession(bool value = true)
{
_options.CleanSession = value;
return this;
}
/// <summary>
/// Clean start is used in MQTT versions 5.0.0 and higher. It is the same as setting "CleanSession".
/// </summary>
public MqttClientOptionsBuilder WithCleanStart(bool value = true)
{
_options.CleanSession = value;
return this;
}
public MqttClientOptionsBuilder WithClientId(string value)
{
_options.ClientId = value;
......@@ -200,6 +200,18 @@ namespace MQTTnet.Client
return WithKeepAlivePeriod(TimeSpan.Zero);
}
/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _true_.
/// </summary>
public MqttClientOptionsBuilder WithoutPacketFragmentation()
{
_options.AllowPacketFragmentation = false;
return this;
}
public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value)
{
if (value == MqttProtocolVersion.Unknown)
......
......@@ -63,7 +63,7 @@ namespace MQTTnet.Implementations
await clientWebSocket.ConnectAsync(new Uri(uri), cancellationToken).ConfigureAwait(false);
}
catch (Exception)
catch
{
// Prevent a memory leak when always creating new instance which will fail while connecting.
clientWebSocket.Dispose();
......@@ -103,7 +103,7 @@ namespace MQTTnet.Implementations
public async Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
#if NET5_0_OR_GREATER
// MQTT Control Packets MUST be sent in WebSocket binary data frames. If any other type of data frame is received the recipient MUST close the Network Connection [MQTT-6.0.0-1].
// A single WebSocket data frame can contain multiple or partial MQTT Control Packets. The receiver MUST NOT assume that MQTT Control Packets are aligned on WebSocket frame boundaries [MQTT-6.0.0-2].
......
......@@ -179,7 +179,7 @@ namespace MQTTnet.Server
}
// Calculate application message topic hash once for subscription checks
MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _);
MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _);
foreach (var session in subscriberSessions)
{
......
......@@ -343,7 +343,7 @@ namespace MQTTnet.Server
using (_subscriptionsLock.EnterAsync(CancellationToken.None).GetAwaiter().GetResult())
{
MqttSubscription.CalculateTopicHash(topicFilter.Topic, out var topicHash, out _, out var hasWildcard);
MqttTopicHash.Calculate(topicFilter.Topic, out var topicHash, out _, out var hasWildcard);
if (_subscriptions.TryGetValue(topicFilter.Topic, out var existingSubscription))
{
......
......@@ -6,163 +6,6 @@ using MQTTnet.Protocol;
namespace MQTTnet.Server
{
/*
* The MqttSubscription object stores subscription parameters and calculates
* topic hashes.
*
* Use of Topic Hashes to improve message processing performance
* =============================================================
*
* Motivation
* -----------
* In a typical use case for MQTT there may be many publishers (sensors or
* other devices in the field) and few subscribers (monitoring all or many topics).
* Each publisher may have one or more topic(s) to publish and therefore both, the
* number of publishers and the number of topics may be large.
*
* Maintaining subscribers in a separate container
* -----------------------------------------------
* Instead of placing all sessions into a single _sessions container, subscribers
* are added into another _subscriberSessions container (if a client is a
* subscriber and a publisher then the client is present in both containers). The
* cost is some additional bookkeeping work upon subscription where each client
* session needs to maintain a list of subscribed topics.
*
* When the client subscribes to the first topic, then the session manager adds
* the client to the _subscriberSessions container, and when the client
* unsubscribes from the last topic then the session manager removes the client
* from the container. Now, when an application message arrives, only the list of
* subscribers need processing instead of looping through potentially thousands of
* publishers.
*
* Improving subscriber topic lookup
* ---------------------------------
* For each subscriber, it needs to be determined whether an application message
* matches any topic the subscriber has subscribed to. There may only be few
* subscribers but there may be many subscribed topics, including wildcard topics.
*
* The implemented approach uses a topic hash and a hash mask calculated on the
* subscribed topic and the published topic (the application message topic) to
* find candidates for a match, with the existing match logic evaluating a reduced
* number of candidates.
*
* For each subscription, the topic hash and a hash mask is stored with the
* subscription, and for each application message received, the hash is calculated
* for the published topic before attempting to find matching subscriptions. The
* hash calculation itself is simple and does not have a large performance impact.
*
* We'll first explain how topic hashes and hash masks are constructed and then how
* they are used.
*
* Topic hash
* ----------
* Topic hashes are stored as 64-bit numbers. Each byte within the 64-bit number
* relates to one MQTT topic level. A checksum is calculated for each topic level
* by iterating over the characters within the topic level (cast to byte) and the
* result is stored into the corresponding byte of the 64-bit number. If a topic
* level contains a wildcard character, then 0x00 is stored instead of the
* checksum.
*
* If there are less than 8 levels then the rest of the 64-bit number is filled
* with 0xff. If there are more than 8 levels then topics where the first 8 MQTT
* topic levels are identical will have the same hash value.
*
* This is the topic hash for the MQTT topic below: 0x655D4AF1FFFFFF
*
* client1/building1/level1/sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \____/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0x5D 0x4A 0xF1 0xFF 0xFF 0xFF 0xFF
*
* This is the topic hash for an MQTT topic containing a wildcard: 0x655D00F1FFFFFF
*
* client1/building1/ + /sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \_/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0x5D 0 0xF1 0xFF 0xFF 0xFF 0xFF
*
* For topics that contain the multi level wildcard # at the end, the topic hash
* is filled with 0x00: 0x65004A00000000
*
* client1/ + /level1/ # (empty) (empty) (empty) (empty)
* \_____/ \_/ \____/ \_/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0 0x4A 0 0 0 0 0
*
*
* Topic hash mask
* ---------------
* The hash mask simply contains 0xFF for non-wildcard topic levels and 0x00 for
* wildcard topic levels. Here are the topic hash masks for the examples above.
*
* client1/building1/level1/sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \____/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF
*
* client1/building1/ + /sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \_/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0xFF 0 0xFF 0xFF 0xFF 0xFF 0xFF
*
* client1/ + /level1/ # (empty) (empty) (empty) (empty)
* \_____/ \_/ \____/ \_/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0 0xFF 0 0 0 0 0
*
*
* Topic hash and hash mask properties
* -----------------------------------
* The following properties of topic hashes and hash masks can be exploited to
* find potentially matching subscribed topics for a given published topic.
*
* (1) If a subscribed topic does not contain wildcards then the topic hash of the
* subscribed topic must be equal to the topic hash of the published topic,
* otherwise the subscribed topic cannot be a candidate for a match.
*
* (2) If a subscribed topic contains wildcards then the hash of the published
* topic masked with the subscribed topic's hash mask must be equal to the hash of
* the subscribed topic. I.e. a subscribed topic is a candidate for a match if:
* (publishedTopicHash & subscribedTopicHashMask) == subscribedTopicHash
*
* (3) If a subscribed topic contains wildcards then any potentially matching
* published topic must have a hash value that is greater than or equal to the
* hash value of the subscribed topic (because the subscribed topic contains
* zeroes in wildcard positions).
*
* Match finding
* -------------
* The subscription manager maintains two separate dictionaries to assist finding
* matches using topic hashes: a _noWildcardSubscriptionsByTopicHash dictionary
* containing all subscriptions that do not have wildcards, and a
* _wildcardSubscriptionsByTopicHash dictionary containing subscriptions with
* wildcards.
*
* For subscriptions without wildcards, all potential candidates for a match are
* obtained by a single look-up (exploiting point 1 above).
*
* For subscriptions with wildcards, the subscription manager loops through the
* wildcard subscriptions and selects candidates that satisfy condition
* (publishedTopicHash & subscribedTopicMask) == subscribedTopicHash (point 2).
* The loop could exit early if wildcard subscriptions were stored into a sorted
* dictionary (utilizing point 3), but, after testing, there does not seem to be
* any real benefit doing so.
*
* Other considerations
* --------------------
* Characters in the topic string are cast to byte and any additional bytes in a
* multi-byte character are disregarded. Best guess is that this does not impact
* performance in practice.
*
* Instead of one-byte checksums per topic level, one-word checksums per topic
* level could be used. If most topics contained four levels or less then hash
* buckets would be shallower.
*
* For very large numbers of topics, performing a parallel search may help further.
*
* To also handle a larger number of subscribers, it may be beneficial to maintain
* a subscribers-by-subscription-topic dictionary.
*/
public sealed class MqttSubscription
{
public MqttSubscription(
......@@ -180,7 +23,7 @@ namespace MQTTnet.Server
GrantedQualityOfServiceLevel = qualityOfServiceLevel;
Identifier = identifier;
CalculateTopicHash(Topic, out var hash, out var hashMask, out var hasWildcard);
MqttTopicHash.Calculate(Topic, out var hash, out var hashMask, out var hasWildcard);
TopicHash = hash;
TopicHashMask = hashMask;
TopicHasWildcard = hasWildcard;
......@@ -203,107 +46,5 @@ namespace MQTTnet.Server
public ulong TopicHashMask { get; }
public bool TopicHasWildcard { get; }
public static void CalculateTopicHash(string topic, out ulong resultHash, out ulong resultHashMask, out bool resultHasWildcard)
{
// calculate topic hash
ulong hash = 0;
ulong hashMaskInverted = 0;
ulong levelBitMask = 0;
ulong fillLevelBitMask = 0;
var hasWildcard = false;
byte checkSum = 0;
var level = 0;
var i = 0;
while (i < topic.Length)
{
var c = topic[i];
if (c == MqttTopicFilterComparer.LevelSeparator)
{
// done with this level
hash <<= 8;
hash |= checkSum;
hashMaskInverted <<= 8;
hashMaskInverted |= levelBitMask;
checkSum = 0;
levelBitMask = 0;
++level;
if (level >= 8)
{
break;
}
}
else if (c == MqttTopicFilterComparer.SingleLevelWildcard)
{
levelBitMask = 0xff;
hasWildcard = true;
}
else if (c == MqttTopicFilterComparer.MultiLevelWildcard)
{
// checksum is zero for a valid topic
levelBitMask = 0xff;
// fill rest with this fillLevelBitMask
fillLevelBitMask = 0xff;
hasWildcard = true;
break;
}
else
{
// The checksum should be designed to reduce the hash bucket depth for the expected
// fairly regularly named MQTT topics that don't differ much,
// i.e. "room1/sensor1"
// "room1/sensor2"
// "room1/sensor3"
// etc.
if ((c & 1) == 0)
{
checkSum += (byte)c;
}
else
{
checkSum ^= (byte)(c >> 1);
}
}
++i;
}
// Shift hash left and leave zeroes to fill ulong
if (level < 8)
{
hash <<= 8;
hash |= checkSum;
hashMaskInverted <<= 8;
hashMaskInverted |= levelBitMask;
++level;
while (level < 8)
{
hash <<= 8;
hashMaskInverted <<= 8;
hashMaskInverted |= fillLevelBitMask;
++level;
}
}
if (!hasWildcard)
{
while (i < topic.Length)
{
var c = topic[i];
if (c == MqttTopicFilterComparer.SingleLevelWildcard || c == MqttTopicFilterComparer.MultiLevelWildcard)
{
hasWildcard = true;
break;
}
++i;
}
}
resultHash = hash;
resultHashMask = ~hashMaskInverted;
resultHasWildcard = hasWildcard;
}
}
}
\ No newline at end of file
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace MQTTnet.Server
{
/*
* The MqttSubscription object stores subscription parameters and calculates
* topic hashes.
*
* Use of Topic Hashes to improve message processing performance
* =============================================================
*
* Motivation
* -----------
* In a typical use case for MQTT there may be many publishers (sensors or
* other devices in the field) and few subscribers (monitoring all or many topics).
* Each publisher may have one or more topic(s) to publish and therefore both, the
* number of publishers and the number of topics may be large.
*
* Maintaining subscribers in a separate container
* -----------------------------------------------
* Instead of placing all sessions into a single _sessions container, subscribers
* are added into another _subscriberSessions container (if a client is a
* subscriber and a publisher then the client is present in both containers). The
* cost is some additional bookkeeping work upon subscription where each client
* session needs to maintain a list of subscribed topics.
*
* When the client subscribes to the first topic, then the session manager adds
* the client to the _subscriberSessions container, and when the client
* unsubscribes from the last topic then the session manager removes the client
* from the container. Now, when an application message arrives, only the list of
* subscribers need processing instead of looping through potentially thousands of
* publishers.
*
* Improving subscriber topic lookup
* ---------------------------------
* For each subscriber, it needs to be determined whether an application message
* matches any topic the subscriber has subscribed to. There may only be few
* subscribers but there may be many subscribed topics, including wildcard topics.
*
* The implemented approach uses a topic hash and a hash mask calculated on the
* subscribed topic and the published topic (the application message topic) to
* find candidates for a match, with the existing match logic evaluating a reduced
* number of candidates.
*
* For each subscription, the topic hash and a hash mask is stored with the
* subscription, and for each application message received, the hash is calculated
* for the published topic before attempting to find matching subscriptions. The
* hash calculation itself is simple and does not have a large performance impact.
*
* We'll first explain how topic hashes and hash masks are constructed and then how
* they are used.
*
* Topic hash
* ----------
* Topic hashes are stored as 64-bit numbers. Each byte within the 64-bit number
* relates to one MQTT topic level. A checksum is calculated for each topic level
* by iterating over the characters within the topic level (cast to byte) and the
* result is stored into the corresponding byte of the 64-bit number. If a topic
* level contains a wildcard character, then 0x00 is stored instead of the
* checksum.
*
* If there are less than 8 levels then the rest of the 64-bit number is filled
* with 0xff. If there are more than 8 levels then topics where the first 8 MQTT
* topic levels are identical will have the same hash value.
*
* This is the topic hash for the MQTT topic below: 0x655D4AF1FFFFFF
*
* client1/building1/level1/sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \____/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0x5D 0x4A 0xF1 0xFF 0xFF 0xFF 0xFF
*
* This is the topic hash for an MQTT topic containing a wildcard: 0x655D00F1FFFFFF
*
* client1/building1/ + /sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \_/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0x5D 0 0xF1 0xFF 0xFF 0xFF 0xFF
*
* For topics that contain the multi level wildcard # at the end, the topic hash
* is filled with 0x00: 0x65004A00000000
*
* client1/ + /level1/ # (empty) (empty) (empty) (empty)
* \_____/ \_/ \____/ \_/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0x65 0 0x4A 0 0 0 0 0
*
*
* Topic hash mask
* ---------------
* The hash mask simply contains 0xFF for non-wildcard topic levels and 0x00 for
* wildcard topic levels. Here are the topic hash masks for the examples above.
*
* client1/building1/level1/sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \____/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF
*
* client1/building1/ + /sensor1 (empty) (empty) (empty) (empty)
* \_____/ \_______/ \_/ \_____/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0xFF 0 0xFF 0xFF 0xFF 0xFF 0xFF
*
* client1/ + /level1/ # (empty) (empty) (empty) (empty)
* \_____/ \_/ \____/ \_/ \_____/ \_____/ \_____/ \_____/
* | | | | | | | |
* 0xFF 0 0xFF 0 0 0 0 0
*
*
* Topic hash and hash mask properties
* -----------------------------------
* The following properties of topic hashes and hash masks can be exploited to
* find potentially matching subscribed topics for a given published topic.
*
* (1) If a subscribed topic does not contain wildcards then the topic hash of the
* subscribed topic must be equal to the topic hash of the published topic,
* otherwise the subscribed topic cannot be a candidate for a match.
*
* (2) If a subscribed topic contains wildcards then the hash of the published
* topic masked with the subscribed topic's hash mask must be equal to the hash of
* the subscribed topic. I.e. a subscribed topic is a candidate for a match if:
* (publishedTopicHash & subscribedTopicHashMask) == subscribedTopicHash
*
* (3) If a subscribed topic contains wildcards then any potentially matching
* published topic must have a hash value that is greater than or equal to the
* hash value of the subscribed topic (because the subscribed topic contains
* zeroes in wildcard positions).
*
* Match finding
* -------------
* The subscription manager maintains two separate dictionaries to assist finding
* matches using topic hashes: a _noWildcardSubscriptionsByTopicHash dictionary
* containing all subscriptions that do not have wildcards, and a
* _wildcardSubscriptionsByTopicHash dictionary containing subscriptions with
* wildcards.
*
* For subscriptions without wildcards, all potential candidates for a match are
* obtained by a single look-up (exploiting point 1 above).
*
* For subscriptions with wildcards, the subscription manager loops through the
* wildcard subscriptions and selects candidates that satisfy condition
* (publishedTopicHash & subscribedTopicMask) == subscribedTopicHash (point 2).
* The loop could exit early if wildcard subscriptions were stored into a sorted
* dictionary (utilizing point 3), but, after testing, there does not seem to be
* any real benefit doing so.
*
* Other considerations
* --------------------
* Characters in the topic string are cast to byte and any additional bytes in a
* multi-byte character are disregarded. Best guess is that this does not impact
* performance in practice.
*
* Instead of one-byte checksums per topic level, one-word checksums per topic
* level could be used. If most topics contained four levels or less then hash
* buckets would be shallower.
*
* For very large numbers of topics, performing a parallel search may help further.
*
* To also handle a larger number of subscribers, it may be beneficial to maintain
* a subscribers-by-subscription-topic dictionary.
*/
public static class MqttTopicHash
{
public static void Calculate(string topic, out ulong resultHash, out ulong resultHashMask, out bool resultHasWildcard)
{
// calculate topic hash
ulong hash = 0;
ulong hashMaskInverted = 0;
ulong levelBitMask = 0;
ulong fillLevelBitMask = 0;
var hasWildcard = false;
byte checkSum = 0;
var level = 0;
var i = 0;
while (i < topic.Length)
{
var c = topic[i];
if (c == MqttTopicFilterComparer.LevelSeparator)
{
// done with this level
hash <<= 8;
hash |= checkSum;
hashMaskInverted <<= 8;
hashMaskInverted |= levelBitMask;
checkSum = 0;
levelBitMask = 0;
++level;
if (level >= 8)
{
break;
}
}
else if (c == MqttTopicFilterComparer.SingleLevelWildcard)
{
levelBitMask = 0xff;
hasWildcard = true;
}
else if (c == MqttTopicFilterComparer.MultiLevelWildcard)
{
// checksum is zero for a valid topic
levelBitMask = 0xff;
// fill rest with this fillLevelBitMask
fillLevelBitMask = 0xff;
hasWildcard = true;
break;
}
else
{
// The checksum should be designed to reduce the hash bucket depth for the expected
// fairly regularly named MQTT topics that don't differ much,
// i.e. "room1/sensor1"
// "room1/sensor2"
// "room1/sensor3"
// etc.
if ((c & 1) == 0)
{
checkSum += (byte)c;
}
else
{
checkSum ^= (byte)(c >> 1);
}
}
++i;
}
// Shift hash left and leave zeroes to fill ulong
if (level < 8)
{
hash <<= 8;
hash |= checkSum;
hashMaskInverted <<= 8;
hashMaskInverted |= levelBitMask;
++level;
while (level < 8)
{
hash <<= 8;
hashMaskInverted <<= 8;
hashMaskInverted |= fillLevelBitMask;
++level;
}
}
if (!hasWildcard)
{
while (i < topic.Length)
{
var c = topic[i];
if (c == MqttTopicFilterComparer.SingleLevelWildcard || c == MqttTopicFilterComparer.MultiLevelWildcard)
{
hasWildcard = true;
break;
}
++i;
}
}
resultHash = hash;
resultHashMask = ~hashMaskInverted;
resultHasWildcard = hasWildcard;
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册