未验证 提交 6dcb1107 编写于 作者: C Christian 提交者: GitHub

Add InterceptingClientEnqueueAsync event (#1679)

上级 b599a832
......@@ -9,3 +9,4 @@
* [Server] Improved performance for retained message handling when subscribing using "DoNotSendOnSubscribe" or "SendAtSubscribeIfNewSubscriptionOnly" (#1661, thanks to @Int32Overflow).
* [Server] Added support for changing the used TLS certificate while the server is running (#1652, thanks to @YAJeff). The certificate provider will now be invoked for every new connection!
* [Server] Added a new API method which allows reading a single retained message without the need to processing the entire set of retained messages (#1659).
* [Server] Added a new event (InterceptingClientEnqueueAsync) which allows intercepting when an application message is enqueued for a client (#1648).
......@@ -51,10 +51,7 @@ namespace MQTTnet.Tests.Mockups
public void AssertReceivedCountEquals(int expectedCount)
{
lock (_receivedEventArgs)
{
Assert.AreEqual(expectedCount, _receivedEventArgs.Count);
}
Assert.AreEqual(expectedCount, Count);
}
public string GeneratePayloadSequence()
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
namespace MQTTnet.Tests.Server
{
[TestClass]
public sealed class Publish_Tests : BaseTestClass
{
[TestMethod]
public async Task Return_Success_When_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();
var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();
await receiver.SubscribeAsync("A");
// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
Assert.AreEqual(MqttClientPublishReasonCode.Success, publishResult.ReasonCode);
Assert.AreEqual(true, publishResult.IsSuccess);
}
}
[TestMethod]
public async Task Return_NoMatchingSubscribers_When_Not_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();
var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();
await receiver.SubscribeAsync("A");
// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("B", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, publishResult.ReasonCode);
Assert.AreEqual(true, publishResult.IsSuccess);
}
}
}
}
\ No newline at end of file
......@@ -5,6 +5,8 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Protocol;
namespace MQTTnet.Tests.Server
......@@ -27,5 +29,79 @@ namespace MQTTnet.Tests.Server
await client.PublishStringAsync("test", qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);
}
}
[TestMethod]
public async Task Return_NoMatchingSubscribers_When_Not_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();
var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();
await receiver.SubscribeAsync("A");
// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("B", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, publishResult.ReasonCode);
Assert.AreEqual(true, publishResult.IsSuccess);
}
}
[TestMethod]
public async Task Return_Success_When_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();
var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();
await receiver.SubscribeAsync("A");
// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
Assert.AreEqual(MqttClientPublishReasonCode.Success, publishResult.ReasonCode);
Assert.AreEqual(true, publishResult.IsSuccess);
}
}
[TestMethod]
public async Task Intercept_Client_Enqueue()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();
var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();
await receiver.SubscribeAsync("A");
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver);
await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
receivedMessages.AssertReceivedCountEquals(1);
server.InterceptingClientEnqueueAsync += e =>
{
e.AcceptEnqueue = false;
return CompletedTask.Instance;
};
await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);
await LongTestDelay();
// Do not increase because the internal enqueue to the target client is not accepted!
receivedMessages.AssertReceivedCountEquals(1);
}
}
}
}
\ No newline at end of file
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
namespace MQTTnet.Server
{
public sealed class InterceptingClientApplicationMessageEnqueueEventArgs : EventArgs
{
public InterceptingClientApplicationMessageEnqueueEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage)
{
SenderClientId = senderClientId ?? throw new ArgumentNullException(nameof(senderClientId));
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
}
/// <summary>
/// Gets or sets whether the enqueue of the application message should be performed or not.
/// If set to _False_ the client will not receive the application message.
/// </summary>
public bool AcceptEnqueue { get; set; } = true;
public MqttApplicationMessage ApplicationMessage { get; }
/// <summary>
/// Indicates if the connection with the sender should be closed.
/// </summary>
public bool CloseSenderConnection { get; set; }
public string ReceiverClientId { get; }
public string SenderClientId { get; }
}
}
\ No newline at end of file
......@@ -199,6 +199,19 @@ namespace MQTTnet.Server
{
continue;
}
if (_eventContainer.InterceptingClientEnqueueEvent.HasHandlers)
{
var eventArgs = new InterceptingClientApplicationMessageEnqueueEventArgs(senderId, session.Id, applicationMessage);
await _eventContainer.InterceptingClientEnqueueEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
if (!eventArgs.AcceptEnqueue)
{
// There will be no reason string and use properties because in this case the clients will
// not receive a packet at all.
return new DispatchApplicationMessageResult(reasonCode, eventArgs.CloseSenderConnection, null, null);
}
}
var publishPacketCopy = MqttPacketFactories.Publish.Create(applicationMessage);
publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
......@@ -219,10 +232,10 @@ namespace MQTTnet.Server
publishPacketCopy.Retain = false;
}
session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
matchingSubscribersCount++;
_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic);
session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic);
}
if (matchingSubscribersCount == 0)
......@@ -233,7 +246,7 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while processing next queued application message.");
_logger.Error(exception, "Error while processing next queued application message");
}
}
......@@ -592,13 +605,8 @@ namespace MQTTnet.Server
if (_eventContainer.ClientDisconnectedEvent.HasHandlers)
{
var eventArgs = new ClientDisconnectedEventArgs(
oldClient.Id,
null,
MqttClientDisconnectType.Takeover,
oldClient.Endpoint,
oldClient.Session.Items);
var eventArgs = new ClientDisconnectedEventArgs(oldClient.Id, null, MqttClientDisconnectType.Takeover, oldClient.Endpoint, oldClient.Session.Items);
await _eventContainer.ClientDisconnectedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
}
}
......
......@@ -342,7 +342,7 @@ namespace MQTTnet.Server
using (_subscriptionsLock.EnterAsync(CancellationToken.None).GetAwaiter().GetResult())
{
MqttSubscription.CalculateTopicHash(topicFilter.Topic, out var topicHash, out var topicHashMask, out var hasWildcard);
MqttSubscription.CalculateTopicHash(topicFilter.Topic, out var topicHash, out _, out var hasWildcard);
if (_subscriptions.TryGetValue(topicFilter.Topic, out var existingSubscription))
{
......
......@@ -21,12 +21,14 @@ namespace MQTTnet.Server
public AsyncEvent<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopicEvent { get; } = new AsyncEvent<ClientUnsubscribedTopicEventArgs>();
public AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueueEvent { get; } = new AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>();
public AsyncEvent<InterceptingPacketEventArgs> InterceptingInboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>();
public AsyncEvent<InterceptingPacketEventArgs> InterceptingOutboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>();
public AsyncEvent<InterceptingPublishEventArgs> InterceptingPublishEvent { get; } = new AsyncEvent<InterceptingPublishEventArgs>();
public AsyncEvent<InterceptingSubscriptionEventArgs> InterceptingSubscriptionEvent { get; } = new AsyncEvent<InterceptingSubscriptionEventArgs>();
public AsyncEvent<InterceptingUnsubscriptionEventArgs> InterceptingUnsubscriptionEvent { get; } = new AsyncEvent<InterceptingUnsubscriptionEventArgs>();
......
......@@ -85,6 +85,12 @@ namespace MQTTnet.Server
remove => _eventContainer.ClientUnsubscribedTopicEvent.RemoveHandler(value);
}
public event Func<InterceptingClientApplicationMessageEnqueueEventArgs, Task> InterceptingClientEnqueueAsync
{
add => _eventContainer.InterceptingClientEnqueueEvent.AddHandler(value);
remove => _eventContainer.InterceptingClientEnqueueEvent.RemoveHandler(value);
}
public event Func<InterceptingPacketEventArgs, Task> InterceptingInboundPacketAsync
{
add => _eventContainer.InterceptingInboundPacketEvent.AddHandler(value);
......
......@@ -18,44 +18,48 @@ namespace MQTTnet.Server
{
_session = session ?? throw new ArgumentNullException(nameof(session));
}
public string Id => _session.Id;
public long PendingApplicationMessagesCount => _session.PendingDataPacketsCount;
public DateTime CreatedTimestamp => _session.CreatedTimestamp;
public string Id => _session.Id;
public IDictionary Items => _session.Items;
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
public long PendingApplicationMessagesCount => _session.PendingDataPacketsCount;
public Task ClearApplicationMessagesQueueAsync()
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
var publishPacketFactory = new MqttPublishPacketFactory();
_session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketFactory.Create(applicationMessage)));
throw new NotImplementedException();
}
return CompletedTask.Instance;
public Task DeleteAsync()
{
return _session.DeleteAsync();
}
public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
var publishPacketFactory = new MqttPublishPacketFactory();
var packetBusItem = new MqttPacketBusItem(publishPacketFactory.Create(applicationMessage));
if (applicationMessage == null)
{
throw new ArgumentNullException(nameof(applicationMessage));
}
var packetBusItem = new MqttPacketBusItem(MqttPacketFactories.Publish.Create(applicationMessage));
_session.EnqueueDataPacket(packetBusItem);
return packetBusItem.WaitAsync();
}
public Task DeleteAsync()
{
return _session.DeleteAsync();
}
public Task ClearApplicationMessagesQueueAsync()
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
{
throw new NotImplementedException();
if (applicationMessage == null)
{
throw new ArgumentNullException(nameof(applicationMessage));
}
_session.EnqueueDataPacket(new MqttPacketBusItem(MqttPacketFactories.Publish.Create(applicationMessage)));
return CompletedTask.Instance;
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册