提交 262a4681 编写于 作者: C Christian

Merge branch '1291-access-client-status-in-client-disconnected-handler'

...@@ -4,3 +4,4 @@ ...@@ -4,3 +4,4 @@
* [Server] Added a new event (_ClientAcknowledgedPublishPacketAsync_) which is fired whenever a client acknowledges a PUBLISH packet (QoS 1 and 2, #487). * [Server] Added a new event (_ClientAcknowledgedPublishPacketAsync_) which is fired whenever a client acknowledges a PUBLISH packet (QoS 1 and 2, #487).
* [Server] Exposed channel adapter (HTTP context etc.) in connection validation (#1125). * [Server] Exposed channel adapter (HTTP context etc.) in connection validation (#1125).
* [Server] The event _InterceptingPublishAsync_ is now also called for injected application messages (#Jeanot-Zubler). * [Server] The event _InterceptingPublishAsync_ is now also called for injected application messages (#Jeanot-Zubler).
* [Server] Exposed session items in multipe events (#1291).
...@@ -8,14 +8,20 @@ namespace MQTTnet.Server ...@@ -8,14 +8,20 @@ namespace MQTTnet.Server
{ {
public sealed class ApplicationMessageNotConsumedEventArgs : EventArgs public sealed class ApplicationMessageNotConsumedEventArgs : EventArgs
{ {
public ApplicationMessageNotConsumedEventArgs(MqttApplicationMessage applicationMessage, string senderId)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
SenderId = senderId;
}
/// <summary> /// <summary>
/// Gets the application message which was not consumed by any client. /// Gets the application message which was not consumed by any client.
/// </summary> /// </summary>
public MqttApplicationMessage ApplicationMessage { get; internal set; } public MqttApplicationMessage ApplicationMessage { get; }
/// <summary> /// <summary>
/// Gets the ID of the client which has sent the affected application message. /// Gets the ID of the client which has sent the affected application message.
/// </summary> /// </summary>
public string SenderId { get; internal set; } public string SenderId { get; }
} }
} }
\ No newline at end of file
...@@ -3,31 +3,46 @@ ...@@ -3,31 +3,46 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
using MQTTnet.Formatter; using MQTTnet.Formatter;
namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class ClientConnectedEventArgs : EventArgs public sealed class ClientConnectedEventArgs : EventArgs
{ {
public ClientConnectedEventArgs(string clientId, string userName, MqttProtocolVersion protocolVersion, string endpoint, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
ProtocolVersion = protocolVersion;
Endpoint = endpoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier of the connected client.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary> /// <summary>
/// Gets the client identifier of the connected client. /// Gets the endpoint of the connected client.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public string Endpoint { get; }
/// <summary> /// <summary>
/// Gets the user name of the connected client. /// Gets the protocol version which is used by the connected client.
/// </summary> /// </summary>
public string UserName { get; internal set; } public MqttProtocolVersion ProtocolVersion { get; }
/// <summary> /// <summary>
/// Gets the protocol version which is used by the connected client. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary> /// </summary>
public MqttProtocolVersion ProtocolVersion { get; internal set; } public IDictionary SessionItems { get; }
/// <summary> /// <summary>
/// Gets the endpoint of the connected client. /// Gets the user name of the connected client.
/// </summary> /// </summary>
public string Endpoint { get; internal set; } public string UserName { get; }
} }
} }
\ No newline at end of file
...@@ -3,19 +3,33 @@ ...@@ -3,19 +3,33 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class ClientDisconnectedEventArgs : EventArgs public sealed class ClientDisconnectedEventArgs : EventArgs
{ {
public ClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType, string endpoint, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;
Endpoint = endpoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public string ClientId { get; }
public MqttClientDisconnectType DisconnectType { get; }
public MqttClientDisconnectType DisconnectType { get; internal set; } public string Endpoint { get; }
public string Endpoint { get; internal set; } /// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary SessionItems { get; }
} }
} }
\ No newline at end of file
...@@ -3,22 +3,35 @@ ...@@ -3,22 +3,35 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
using MQTTnet.Packets; using MQTTnet.Packets;
namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class ClientSubscribedTopicEventArgs : EventArgs public sealed class ClientSubscribedTopicEventArgs : EventArgs
{ {
public ClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public IDictionary SessionItems { get; }
/// <summary> /// <summary>
/// Gets the topic filter. /// Gets the topic filter.
/// The topic filter can contain topics and wildcards. /// The topic filter can contain topics and wildcards.
/// </summary> /// </summary>
public MqttTopicFilter TopicFilter { get; internal set; } public MqttTopicFilter TopicFilter { get; }
} }
} }
\ No newline at end of file
...@@ -3,21 +3,34 @@ ...@@ -3,21 +3,34 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class ClientUnsubscribedTopicEventArgs : EventArgs public sealed class ClientUnsubscribedTopicEventArgs : EventArgs
{ {
public ClientUnsubscribedTopicEventArgs(string clientId, string topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public IDictionary SessionItems { get; }
/// <summary> /// <summary>
/// Gets or sets the topic filter. /// Gets or sets the topic filter.
/// The topic filter can contain topics and wildcards. /// The topic filter can contain topics and wildcards.
/// </summary> /// </summary>
public string TopicFilter { get; internal set; } public string TopicFilter { get; }
} }
} }
\ No newline at end of file
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
using System.Threading; using System.Threading;
using MQTTnet.Packets; using MQTTnet.Packets;
...@@ -10,30 +11,44 @@ namespace MQTTnet.Server ...@@ -10,30 +11,44 @@ namespace MQTTnet.Server
{ {
public sealed class InterceptingPacketEventArgs : EventArgs public sealed class InterceptingPacketEventArgs : EventArgs
{ {
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, string endpoint, MqttPacket packet, IDictionary sessionItems)
{
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Endpoint = endpoint;
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
SessionItems = sessionItems;
}
/// <summary> /// <summary>
/// Gets the client ID which has sent the packet or will receive the packet. /// Gets the cancellation token from the connection managing thread.
/// Use this in further event processing.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public CancellationToken CancellationToken { get; }
/// <summary> /// <summary>
/// Gets the endpoint of the sending or receiving client. /// Gets the client ID which has sent the packet or will receive the packet.
/// </summary> /// </summary>
public string Endpoint { get; internal set; } public string ClientId { get; }
/// <summary> /// <summary>
/// Gets or sets the MQTT packet which was received or will be sent. /// Gets the endpoint of the sending or receiving client.
/// </summary>
public string Endpoint { get; }
/// <summary>
/// Gets or sets the MQTT packet which was received or will be sent.
/// </summary> /// </summary>
public MqttPacket Packet { get; set; } public MqttPacket Packet { get; set; }
/// <summary> /// <summary>
/// Gets or sets whether the packet should be processed or not. /// Gets or sets whether the packet should be processed or not.
/// </summary> /// </summary>
public bool ProcessPacket { get; set; } = true; public bool ProcessPacket { get; set; } = true;
/// <summary> /// <summary>
/// Gets the cancellation token from the connection managing thread. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// Use this in further event processing.
/// </summary> /// </summary>
public CancellationToken CancellationToken { get; internal set; } public IDictionary SessionItems { get; }
} }
} }
\ No newline at end of file
...@@ -10,18 +10,26 @@ namespace MQTTnet.Server ...@@ -10,18 +10,26 @@ namespace MQTTnet.Server
{ {
public sealed class InterceptingPublishEventArgs : EventArgs public sealed class InterceptingPublishEventArgs : EventArgs
{ {
public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken, string clientId, IDictionary sessionItems)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
public MqttApplicationMessage ApplicationMessage { get; set; } public MqttApplicationMessage ApplicationMessage { get; set; }
/// <summary> /// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down. /// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary> /// </summary>
public CancellationToken CancellationToken { get; internal set; } public CancellationToken CancellationToken { get; }
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public string ClientId { get; }
public bool CloseConnection { get; set; } public bool CloseConnection { get; set; }
...@@ -38,6 +46,6 @@ namespace MQTTnet.Server ...@@ -38,6 +46,6 @@ namespace MQTTnet.Server
/// <summary> /// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary> /// </summary>
public IDictionary SessionItems { get; internal set; } public IDictionary SessionItems { get; }
} }
} }
\ No newline at end of file
...@@ -12,16 +12,28 @@ namespace MQTTnet.Server ...@@ -12,16 +12,28 @@ namespace MQTTnet.Server
{ {
public sealed class InterceptingSubscriptionEventArgs : EventArgs public sealed class InterceptingSubscriptionEventArgs : EventArgs
{ {
public InterceptingSubscriptionEventArgs(
CancellationToken cancellationToken,
string clientId,
MqttSessionStatus session,
MqttTopicFilter topicFilter)
{
CancellationToken = cancellationToken;
ClientId = clientId;
Session = session;
TopicFilter = topicFilter;
}
/// <summary> /// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down. /// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary> /// </summary>
public CancellationToken CancellationToken { get; internal set; } public CancellationToken CancellationToken { get; }
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public string ClientId { get; }
/// <summary> /// <summary>
/// Gets or sets whether the broker should close the client connection. /// Gets or sets whether the broker should close the client connection.
...@@ -48,12 +60,12 @@ namespace MQTTnet.Server ...@@ -48,12 +60,12 @@ namespace MQTTnet.Server
/// <summary> /// <summary>
/// Gets the current client session. /// Gets the current client session.
/// </summary> /// </summary>
public MqttSessionStatus Session { get; internal set; } public MqttSessionStatus Session { get; }
/// <summary> /// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary> /// </summary>
public IDictionary SessionItems { get; internal set; } public IDictionary SessionItems => Session.Items;
/// <summary> /// <summary>
/// Gets or sets the topic filter. /// Gets or sets the topic filter.
......
...@@ -10,16 +10,24 @@ namespace MQTTnet.Server ...@@ -10,16 +10,24 @@ namespace MQTTnet.Server
{ {
public sealed class InterceptingUnsubscriptionEventArgs : EventArgs public sealed class InterceptingUnsubscriptionEventArgs : EventArgs
{ {
public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken, string clientId, IDictionary sessionItems, string topic)
{
CancellationToken = cancellationToken;
ClientId = clientId;
SessionItems = sessionItems;
Topic = topic;
}
/// <summary> /// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down. /// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary> /// </summary>
public CancellationToken CancellationToken { get; internal set; } public CancellationToken CancellationToken { get; }
/// <summary> /// <summary>
/// Gets the client identifier. /// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary> /// </summary>
public string ClientId { get; internal set; } public string ClientId { get; }
/// <summary> /// <summary>
/// Gets or sets whether the broker should close the client connection. /// Gets or sets whether the broker should close the client connection.
...@@ -41,7 +49,7 @@ namespace MQTTnet.Server ...@@ -41,7 +49,7 @@ namespace MQTTnet.Server
/// <summary> /// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary> /// </summary>
public IDictionary SessionItems { get; internal set; } public IDictionary SessionItems { get; }
/// <summary> /// <summary>
/// Gets or sets the MQTT topic. /// Gets or sets the MQTT topic.
...@@ -50,6 +58,6 @@ namespace MQTTnet.Server ...@@ -50,6 +58,6 @@ namespace MQTTnet.Server
/// The topic consists of one or more topic levels. Each topic level is separated by a forward slash (topic level /// The topic consists of one or more topic levels. Each topic level is separated by a forward slash (topic level
/// separator). /// separator).
/// </summary> /// </summary>
public string Topic { get; internal set; } public string Topic { get; }
} }
} }
\ No newline at end of file
...@@ -9,10 +9,17 @@ namespace MQTTnet.Server ...@@ -9,10 +9,17 @@ namespace MQTTnet.Server
{ {
public sealed class RetainedMessageChangedEventArgs : EventArgs public sealed class RetainedMessageChangedEventArgs : EventArgs
{ {
public string ClientId { get; internal set; } public RetainedMessageChangedEventArgs(string clientId, MqttApplicationMessage changedRetainedMessage, List<MqttApplicationMessage> storedRetainedMessages)
{
public MqttApplicationMessage ChangedRetainedMessage { get; internal set; } ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
ChangedRetainedMessage = changedRetainedMessage ?? throw new ArgumentNullException(nameof(changedRetainedMessage));
public List<MqttApplicationMessage> StoredRetainedMessages { get; internal set; } StoredRetainedMessages = storedRetainedMessages ?? throw new ArgumentNullException(nameof(storedRetainedMessages));
}
public MqttApplicationMessage ChangedRetainedMessage { get; }
public string ClientId { get; }
public List<MqttApplicationMessage> StoredRetainedMessages { get; }
} }
} }
\ No newline at end of file
...@@ -3,14 +3,26 @@ ...@@ -3,14 +3,26 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.
using System; using System;
using System.Collections;
namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class SessionDeletedEventArgs : EventArgs public sealed class SessionDeletedEventArgs : EventArgs
{ {
public SessionDeletedEventArgs(string id, IDictionary sessionItems)
{
Id = id ?? throw new ArgumentNullException(nameof(id));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the ID of the session.
/// </summary>
public string Id { get; }
/// <summary> /// <summary>
/// Gets the ID of the session. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary> /// </summary>
public string Id { get; internal set; } public IDictionary SessionItems { get; }
} }
} }
\ No newline at end of file
...@@ -170,7 +170,7 @@ namespace MQTTnet.Server ...@@ -170,7 +170,7 @@ namespace MQTTnet.Server
ClientId = Id, ClientId = Id,
SessionItems = Session.Items SessionItems = Session.Items
}; };
return _eventContainer.ClientAcknowledgedPublishPacketEvent.TryInvokeAsync(eventArgs, _logger); return _eventContainer.ClientAcknowledgedPublishPacketEvent.TryInvokeAsync(eventArgs, _logger);
} }
...@@ -180,7 +180,7 @@ namespace MQTTnet.Server ...@@ -180,7 +180,7 @@ namespace MQTTnet.Server
Task HandleIncomingPubAckPacket(MqttPubAckPacket pubAckPacket) Task HandleIncomingPubAckPacket(MqttPubAckPacket pubAckPacket)
{ {
var acknowledgedPublishPacket = Session.AcknowledgePublishPacket(pubAckPacket.PacketIdentifier); var acknowledgedPublishPacket = Session.AcknowledgePublishPacket(pubAckPacket.PacketIdentifier);
if (acknowledgedPublishPacket != null) if (acknowledgedPublishPacket != null)
{ {
return ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubAckPacket); return ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubAckPacket);
...@@ -192,7 +192,7 @@ namespace MQTTnet.Server ...@@ -192,7 +192,7 @@ namespace MQTTnet.Server
Task HandleIncomingPubCompPacket(MqttPubCompPacket pubCompPacket) Task HandleIncomingPubCompPacket(MqttPubCompPacket pubCompPacket)
{ {
var acknowledgedPublishPacket = Session.AcknowledgePublishPacket(pubCompPacket.PacketIdentifier); var acknowledgedPublishPacket = Session.AcknowledgePublishPacket(pubCompPacket.PacketIdentifier);
if (acknowledgedPublishPacket != null) if (acknowledgedPublishPacket != null)
{ {
return ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubCompPacket); return ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubCompPacket);
...@@ -212,16 +212,7 @@ namespace MQTTnet.Server ...@@ -212,16 +212,7 @@ namespace MQTTnet.Server
if (_eventContainer.InterceptingPublishEvent.HasHandlers) if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{ {
interceptingPublishEventArgs = new InterceptingPublishEventArgs interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, Id, Session.Items);
{
ClientId = Id,
ApplicationMessage = applicationMessage,
SessionItems = Session.Items,
ProcessPublish = true,
CloseConnection = false,
CancellationToken = cancellationToken
};
if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic))
{ {
// This can happen if a topic alias us used but the topic is // This can happen if a topic alias us used but the topic is
...@@ -277,7 +268,7 @@ namespace MQTTnet.Server ...@@ -277,7 +268,7 @@ namespace MQTTnet.Server
async Task HandleIncomingPubRecPacket(MqttPubRecPacket pubRecPacket) async Task HandleIncomingPubRecPacket(MqttPubRecPacket pubRecPacket)
{ {
var acknowledgedPublishPacket = Session.PeekAcknowledgePublishPacket(pubRecPacket.PacketIdentifier); var acknowledgedPublishPacket = Session.PeekAcknowledgePublishPacket(pubRecPacket.PacketIdentifier);
if (acknowledgedPublishPacket != null) if (acknowledgedPublishPacket != null)
{ {
await ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubRecPacket).ConfigureAwait(false); await ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubRecPacket).ConfigureAwait(false);
...@@ -317,6 +308,7 @@ namespace MQTTnet.Server ...@@ -317,6 +308,7 @@ namespace MQTTnet.Server
} }
} }
async Task HandleIncomingUnsubscribePacket(MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) async Task HandleIncomingUnsubscribePacket(MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
{ {
var unsubscribeResult = await Session.SubscriptionsManager.Unsubscribe(unsubscribePacket, cancellationToken).ConfigureAwait(false); var unsubscribeResult = await Session.SubscriptionsManager.Unsubscribe(unsubscribePacket, cancellationToken).ConfigureAwait(false);
...@@ -365,14 +357,7 @@ namespace MQTTnet.Server ...@@ -365,14 +357,7 @@ namespace MQTTnet.Server
return packet; return packet;
} }
var interceptingPacketEventArgs = new InterceptingPacketEventArgs var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items);
{
ClientId = Id,
Endpoint = Endpoint,
Packet = packet,
CancellationToken = cancellationToken
};
await _eventContainer.InterceptingOutboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false); await _eventContainer.InterceptingOutboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);
if (!interceptingPacketEventArgs.ProcessPacket || packet == null) if (!interceptingPacketEventArgs.ProcessPacket || packet == null)
...@@ -393,7 +378,7 @@ namespace MQTTnet.Server ...@@ -393,7 +378,7 @@ namespace MQTTnet.Server
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
await Task.Yield(); await Task.Yield();
var packet = await ChannelAdapter.ReceivePacketAsync(cancellationToken).ConfigureAwait(false); var packet = await ChannelAdapter.ReceivePacketAsync(cancellationToken).ConfigureAwait(false);
if (packet == null) if (packet == null)
{ {
...@@ -404,14 +389,7 @@ namespace MQTTnet.Server ...@@ -404,14 +389,7 @@ namespace MQTTnet.Server
if (_eventContainer.InterceptingInboundPacketEvent.HasHandlers) if (_eventContainer.InterceptingInboundPacketEvent.HasHandlers)
{ {
var interceptingPacketEventArgs = new InterceptingPacketEventArgs var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items);
{
ClientId = Id,
Endpoint = Endpoint,
Packet = packet,
CancellationToken = cancellationToken
};
await _eventContainer.InterceptingInboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false); await _eventContainer.InterceptingInboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);
packet = interceptingPacketEventArgs.Packet; packet = interceptingPacketEventArgs.Packet;
processPacket = interceptingPacketEventArgs.ProcessPacket; processPacket = interceptingPacketEventArgs.ProcessPacket;
......
...@@ -109,13 +109,9 @@ namespace MQTTnet.Server ...@@ -109,13 +109,9 @@ namespace MQTTnet.Server
try try
{ {
if (_eventContainer.SessionDeletedEvent.HasHandlers) if (_eventContainer.SessionDeletedEvent.HasHandlers && session != null)
{ {
var eventArgs = new SessionDeletedEventArgs var eventArgs = new SessionDeletedEventArgs(clientId, session.Items);
{
Id = session?.Id
};
await _eventContainer.SessionDeletedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false); await _eventContainer.SessionDeletedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
} }
} }
...@@ -295,13 +291,12 @@ namespace MQTTnet.Server ...@@ -295,13 +291,12 @@ namespace MQTTnet.Server
if (_eventContainer.ClientConnectedEvent.HasHandlers) if (_eventContainer.ClientConnectedEvent.HasHandlers)
{ {
var eventArgs = new ClientConnectedEventArgs var eventArgs = new ClientConnectedEventArgs(
{ connectPacket.ClientId,
ClientId = connectPacket.ClientId, connectPacket.Username,
UserName = connectPacket.Username, channelAdapter.PacketFormatterAdapter.ProtocolVersion,
ProtocolVersion = channelAdapter.PacketFormatterAdapter.ProtocolVersion, channelAdapter.Endpoint,
Endpoint = channelAdapter.Endpoint client.Session.Items);
};
await _eventContainer.ClientConnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ClientConnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
...@@ -340,12 +335,8 @@ namespace MQTTnet.Server ...@@ -340,12 +335,8 @@ namespace MQTTnet.Server
if (client.Id != null && !client.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers) if (client.Id != null && !client.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers)
{ {
var eventArgs = new ClientDisconnectedEventArgs var disconnectType = client.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean;
{ var eventArgs = new ClientDisconnectedEventArgs(client.Id, disconnectType, endpoint, client.Session.Items);
ClientId = client.Id,
DisconnectType = client.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean,
Endpoint = endpoint
};
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
...@@ -533,13 +524,7 @@ namespace MQTTnet.Server ...@@ -533,13 +524,7 @@ namespace MQTTnet.Server
if (_eventContainer.ClientConnectedEvent.HasHandlers) if (_eventContainer.ClientConnectedEvent.HasHandlers)
{ {
var eventArgs = new ClientDisconnectedEventArgs var eventArgs = new ClientDisconnectedEventArgs(existing.Id, MqttClientDisconnectType.Takeover, existing.Endpoint, existing.Session.Items);
{
ClientId = existing.Id,
DisconnectType = MqttClientDisconnectType.Takeover,
Endpoint = existing.Endpoint
};
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
} }
...@@ -572,12 +557,7 @@ namespace MQTTnet.Server ...@@ -572,12 +557,7 @@ namespace MQTTnet.Server
return; return;
} }
var eventArgs = new ApplicationMessageNotConsumedEventArgs var eventArgs = new ApplicationMessageNotConsumedEventArgs(applicationMessage, senderId);
{
ApplicationMessage = applicationMessage,
SenderId = senderId
};
await _eventContainer.ApplicationMessageNotConsumedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ApplicationMessageNotConsumedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
......
...@@ -217,12 +217,7 @@ namespace MQTTnet.Server ...@@ -217,12 +217,7 @@ namespace MQTTnet.Server
{ {
foreach (var finalTopicFilter in finalTopicFilters) foreach (var finalTopicFilter in finalTopicFilters)
{ {
var eventArgs = new ClientSubscribedTopicEventArgs var eventArgs = new ClientSubscribedTopicEventArgs(_session.Id, finalTopicFilter, _session.Items);
{
ClientId = _session.Id,
TopicFilter = finalTopicFilter
};
await _eventContainer.ClientSubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ClientSubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
} }
...@@ -312,12 +307,7 @@ namespace MQTTnet.Server ...@@ -312,12 +307,7 @@ namespace MQTTnet.Server
{ {
foreach (var topicFilter in unsubscribePacket.TopicFilters) foreach (var topicFilter in unsubscribePacket.TopicFilters)
{ {
var eventArgs = new ClientUnsubscribedTopicEventArgs var eventArgs = new ClientUnsubscribedTopicEventArgs(_session.Id, topicFilter, _session.Items);
{
ClientId = _session.Id,
TopicFilter = topicFilter
};
await _eventContainer.ClientUnsubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.ClientUnsubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
} }
...@@ -472,14 +462,7 @@ namespace MQTTnet.Server ...@@ -472,14 +462,7 @@ namespace MQTTnet.Server
async Task<InterceptingSubscriptionEventArgs> InterceptSubscribe(MqttTopicFilter topicFilter, CancellationToken cancellationToken) async Task<InterceptingSubscriptionEventArgs> InterceptSubscribe(MqttTopicFilter topicFilter, CancellationToken cancellationToken)
{ {
var eventArgs = new InterceptingSubscriptionEventArgs var eventArgs = new InterceptingSubscriptionEventArgs(cancellationToken, _session.Id, new MqttSessionStatus(_session), topicFilter);
{
ClientId = _session.Id,
TopicFilter = topicFilter,
SessionItems = _session.Items,
Session = new MqttSessionStatus(_session),
CancellationToken = cancellationToken
};
if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
...@@ -508,13 +491,7 @@ namespace MQTTnet.Server ...@@ -508,13 +491,7 @@ namespace MQTTnet.Server
async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(string topicFilter, MqttSubscription mqttSubscription, CancellationToken cancellationToken) async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(string topicFilter, MqttSubscription mqttSubscription, CancellationToken cancellationToken)
{ {
var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs(cancellationToken, topicFilter, _session.Items, topicFilter);
{
ClientId = _session.Id,
Topic = topicFilter,
SessionItems = _session.Items,
CancellationToken = cancellationToken
};
if (mqttSubscription == null) if (mqttSubscription == null)
{ {
......
...@@ -105,13 +105,7 @@ namespace MQTTnet.Server ...@@ -105,13 +105,7 @@ namespace MQTTnet.Server
{ {
using (await _storageAccessLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) using (await _storageAccessLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{ {
var eventArgs = new RetainedMessageChangedEventArgs var eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave);
{
ClientId = clientId,
ChangedRetainedMessage = applicationMessage,
StoredRetainedMessages = messagesForSave
};
await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
} }
} }
......
...@@ -24,15 +24,18 @@ namespace MQTTnet.Server ...@@ -24,15 +24,18 @@ namespace MQTTnet.Server
readonly IDictionary _sessionItems = new ConcurrentDictionary<object, object>(); readonly IDictionary _sessionItems = new ConcurrentDictionary<object, object>();
readonly ICollection<IMqttServerAdapter> _adapters; readonly ICollection<IMqttServerAdapter> _adapters;
readonly MqttClientSessionsManager _clientSessionsManager;
readonly MqttServerEventContainer _eventContainer = new MqttServerEventContainer();
readonly MqttServerKeepAliveMonitor _keepAliveMonitor;
readonly MqttNetSourceLogger _logger; readonly MqttNetSourceLogger _logger;
readonly MqttServerOptions _options; readonly MqttServerOptions _options;
readonly IMqttNetLogger _rootLogger;
readonly MqttRetainedMessagesManager _retainedMessagesManager; readonly MqttRetainedMessagesManager _retainedMessagesManager;
readonly MqttServerKeepAliveMonitor _keepAliveMonitor; readonly IMqttNetLogger _rootLogger;
readonly MqttClientSessionsManager _clientSessionsManager;
readonly IDictionary _sessionItems = new ConcurrentDictionary<object, object>();
CancellationTokenSource _cancellationTokenSource; CancellationTokenSource _cancellationTokenSource;
public MqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger) public MqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
...@@ -58,18 +61,18 @@ namespace MQTTnet.Server ...@@ -58,18 +61,18 @@ namespace MQTTnet.Server
remove => _eventContainer.ApplicationMessageNotConsumedEvent.RemoveHandler(value); remove => _eventContainer.ApplicationMessageNotConsumedEvent.RemoveHandler(value);
} }
public event Func<ClientConnectedEventArgs, Task> ClientConnectedAsync
{
add => _eventContainer.ClientConnectedEvent.AddHandler(value);
remove => _eventContainer.ClientConnectedEvent.RemoveHandler(value);
}
public event Func<ClientAcknowledgedPublishPacketEventArgs, Task> ClientAcknowledgedPublishPacketAsync public event Func<ClientAcknowledgedPublishPacketEventArgs, Task> ClientAcknowledgedPublishPacketAsync
{ {
add => _eventContainer.ClientAcknowledgedPublishPacketEvent.AddHandler(value); add => _eventContainer.ClientAcknowledgedPublishPacketEvent.AddHandler(value);
remove => _eventContainer.ClientAcknowledgedPublishPacketEvent.RemoveHandler(value); remove => _eventContainer.ClientAcknowledgedPublishPacketEvent.RemoveHandler(value);
} }
public event Func<ClientConnectedEventArgs, Task> ClientConnectedAsync
{
add => _eventContainer.ClientConnectedEvent.AddHandler(value);
remove => _eventContainer.ClientConnectedEvent.RemoveHandler(value);
}
public event Func<ClientDisconnectedEventArgs, Task> ClientDisconnectedAsync public event Func<ClientDisconnectedEventArgs, Task> ClientDisconnectedAsync
{ {
add => _eventContainer.ClientDisconnectedEvent.AddHandler(value); add => _eventContainer.ClientDisconnectedEvent.AddHandler(value);
...@@ -167,7 +170,7 @@ namespace MQTTnet.Server ...@@ -167,7 +170,7 @@ namespace MQTTnet.Server
} }
public bool IsStarted => _cancellationTokenSource != null; public bool IsStarted => _cancellationTokenSource != null;
public Task DeleteRetainedMessagesAsync() public Task DeleteRetainedMessagesAsync()
{ {
ThrowIfNotStarted(); ThrowIfNotStarted();
...@@ -183,10 +186,10 @@ namespace MQTTnet.Server ...@@ -183,10 +186,10 @@ namespace MQTTnet.Server
} }
ThrowIfNotStarted(); ThrowIfNotStarted();
return _clientSessionsManager.GetClient(id).StopAsync(reasonCode); return _clientSessionsManager.GetClient(id).StopAsync(reasonCode);
} }
public Task<IList<MqttClientStatus>> GetClientsAsync() public Task<IList<MqttClientStatus>> GetClientsAsync()
{ {
ThrowIfNotStarted(); ThrowIfNotStarted();
...@@ -226,17 +229,15 @@ namespace MQTTnet.Server ...@@ -226,17 +229,15 @@ namespace MQTTnet.Server
var processPublish = true; var processPublish = true;
var applicationMessage = injectedApplicationMessage.ApplicationMessage; var applicationMessage = injectedApplicationMessage.ApplicationMessage;
if (_eventContainer.InterceptingPublishEvent.HasHandlers) if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{ {
var interceptingPublishEventArgs = new InterceptingPublishEventArgs var interceptingPublishEventArgs = new InterceptingPublishEventArgs(
{ applicationMessage,
ApplicationMessage = applicationMessage, _cancellationTokenSource.Token,
CancellationToken = _cancellationTokenSource.Token, injectedApplicationMessage.SenderClientId,
ClientId = injectedApplicationMessage.SenderClientId, _sessionItems);
SessionItems = _sessionItems
};
await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false);
applicationMessage = interceptingPublishEventArgs.ApplicationMessage; applicationMessage = interceptingPublishEventArgs.ApplicationMessage;
...@@ -274,7 +275,7 @@ namespace MQTTnet.Server ...@@ -274,7 +275,7 @@ namespace MQTTnet.Server
} }
await _eventContainer.StartedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); await _eventContainer.StartedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false);
_logger.Info("Started."); _logger.Info("Started.");
} }
...@@ -304,7 +305,7 @@ namespace MQTTnet.Server ...@@ -304,7 +305,7 @@ namespace MQTTnet.Server
} }
await _eventContainer.StoppedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); await _eventContainer.StoppedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false);
_logger.Info("Stopped."); _logger.Info("Stopped.");
} }
...@@ -348,7 +349,7 @@ namespace MQTTnet.Server ...@@ -348,7 +349,7 @@ namespace MQTTnet.Server
return _clientSessionsManager.UnsubscribeAsync(clientId, topicFilters); return _clientSessionsManager.UnsubscribeAsync(clientId, topicFilters);
} }
public Task UpdateRetainedMessageAsync(MqttApplicationMessage retainedMessage) public Task UpdateRetainedMessageAsync(MqttApplicationMessage retainedMessage)
{ {
if (retainedMessage == null) if (retainedMessage == null)
...@@ -385,7 +386,7 @@ namespace MQTTnet.Server ...@@ -385,7 +386,7 @@ namespace MQTTnet.Server
void ThrowIfNotStarted() void ThrowIfNotStarted()
{ {
ThrowIfDisposed(); ThrowIfDisposed();
if (_cancellationTokenSource == null) if (_cancellationTokenSource == null)
{ {
throw new InvalidOperationException("The MQTT server is not started."); throw new InvalidOperationException("The MQTT server is not started.");
...@@ -395,7 +396,7 @@ namespace MQTTnet.Server ...@@ -395,7 +396,7 @@ namespace MQTTnet.Server
void ThrowIfStarted() void ThrowIfStarted()
{ {
ThrowIfDisposed(); ThrowIfDisposed();
if (_cancellationTokenSource != null) if (_cancellationTokenSource != null)
{ {
throw new InvalidOperationException("The MQTT server is already started."); throw new InvalidOperationException("The MQTT server is already started.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册