提交 09385b1b 编写于 作者: C Christian

Pass session item to several more event handers.

上级 e3349501
......@@ -8,14 +8,20 @@ namespace MQTTnet.Server
{
public sealed class ApplicationMessageNotConsumedEventArgs : EventArgs
{
public ApplicationMessageNotConsumedEventArgs(MqttApplicationMessage applicationMessage, string senderId)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
SenderId = senderId;
}
/// <summary>
/// Gets the application message which was not consumed by any client.
/// </summary>
public MqttApplicationMessage ApplicationMessage { get; internal set; }
public MqttApplicationMessage ApplicationMessage { get; }
/// <summary>
/// Gets the ID of the client which has sent the affected application message.
/// </summary>
public string SenderId { get; internal set; }
public string SenderId { get; }
}
}
\ No newline at end of file
......@@ -3,31 +3,46 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
using MQTTnet.Formatter;
namespace MQTTnet.Server
{
public sealed class ClientConnectedEventArgs : EventArgs
{
public ClientConnectedEventArgs(string clientId, string userName, MqttProtocolVersion protocolVersion, string endpoint, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
ProtocolVersion = protocolVersion;
Endpoint = endpoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier of the connected client.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary>
/// Gets the client identifier of the connected client.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// Gets the endpoint of the connected client.
/// </summary>
public string ClientId { get; internal set; }
public string Endpoint { get; }
/// <summary>
/// Gets the user name of the connected client.
/// Gets the protocol version which is used by the connected client.
/// </summary>
public string UserName { get; internal set; }
public MqttProtocolVersion ProtocolVersion { get; }
/// <summary>
/// Gets the protocol version which is used by the connected client.
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public MqttProtocolVersion ProtocolVersion { get; internal set; }
public IDictionary SessionItems { get; }
/// <summary>
/// Gets the endpoint of the connected client.
/// Gets the user name of the connected client.
/// </summary>
public string Endpoint { get; internal set; }
public string UserName { get; }
}
}
}
\ No newline at end of file
......@@ -3,19 +3,33 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
namespace MQTTnet.Server
{
public sealed class ClientDisconnectedEventArgs : EventArgs
{
public ClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType, string endpoint, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;
Endpoint = endpoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; internal set; }
public string ClientId { get; }
public MqttClientDisconnectType DisconnectType { get; }
public MqttClientDisconnectType DisconnectType { get; internal set; }
public string Endpoint { get; }
public string Endpoint { get; internal set; }
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary SessionItems { get; }
}
}
}
\ No newline at end of file
......@@ -3,22 +3,35 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
using MQTTnet.Packets;
namespace MQTTnet.Server
{
public sealed class ClientSubscribedTopicEventArgs : EventArgs
{
public ClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public string ClientId { get; internal set; }
public IDictionary SessionItems { get; }
/// <summary>
/// Gets the topic filter.
/// The topic filter can contain topics and wildcards.
/// Gets the topic filter.
/// The topic filter can contain topics and wildcards.
/// </summary>
public MqttTopicFilter TopicFilter { get; internal set; }
public MqttTopicFilter TopicFilter { get; }
}
}
}
\ No newline at end of file
......@@ -3,21 +3,34 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
namespace MQTTnet.Server
{
public sealed class ClientUnsubscribedTopicEventArgs : EventArgs
{
public ClientUnsubscribedTopicEventArgs(string clientId, string topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public string ClientId { get; internal set; }
public IDictionary SessionItems { get; }
/// <summary>
/// Gets or sets the topic filter.
/// The topic filter can contain topics and wildcards.
/// Gets or sets the topic filter.
/// The topic filter can contain topics and wildcards.
/// </summary>
public string TopicFilter { get; internal set; }
public string TopicFilter { get; }
}
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
using System.Threading;
using MQTTnet.Packets;
......@@ -10,30 +11,44 @@ namespace MQTTnet.Server
{
public sealed class InterceptingPacketEventArgs : EventArgs
{
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, string endpoint, MqttPacket packet, IDictionary sessionItems)
{
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Endpoint = endpoint;
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
SessionItems = sessionItems;
}
/// <summary>
/// Gets the client ID which has sent the packet or will receive the packet.
/// Gets the cancellation token from the connection managing thread.
/// Use this in further event processing.
/// </summary>
public string ClientId { get; internal set; }
public CancellationToken CancellationToken { get; }
/// <summary>
/// Gets the endpoint of the sending or receiving client.
/// Gets the client ID which has sent the packet or will receive the packet.
/// </summary>
public string Endpoint { get; internal set; }
public string ClientId { get; }
/// <summary>
/// Gets or sets the MQTT packet which was received or will be sent.
/// Gets the endpoint of the sending or receiving client.
/// </summary>
public string Endpoint { get; }
/// <summary>
/// Gets or sets the MQTT packet which was received or will be sent.
/// </summary>
public MqttPacket Packet { get; set; }
/// <summary>
/// Gets or sets whether the packet should be processed or not.
/// Gets or sets whether the packet should be processed or not.
/// </summary>
public bool ProcessPacket { get; set; } = true;
/// <summary>
/// Gets the cancellation token from the connection managing thread.
/// Use this in further event processing.
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public CancellationToken CancellationToken { get; internal set; }
public IDictionary SessionItems { get; }
}
}
\ No newline at end of file
......@@ -10,18 +10,26 @@ namespace MQTTnet.Server
{
public sealed class InterceptingPublishEventArgs : EventArgs
{
public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken, string clientId, IDictionary sessionItems)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
public MqttApplicationMessage ApplicationMessage { get; set; }
/// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary>
public CancellationToken CancellationToken { get; internal set; }
public CancellationToken CancellationToken { get; }
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; internal set; }
public string ClientId { get; }
public bool CloseConnection { get; set; }
......@@ -38,6 +46,6 @@ namespace MQTTnet.Server
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary SessionItems { get; internal set; }
public IDictionary SessionItems { get; }
}
}
\ No newline at end of file
......@@ -12,16 +12,28 @@ namespace MQTTnet.Server
{
public sealed class InterceptingSubscriptionEventArgs : EventArgs
{
public InterceptingSubscriptionEventArgs(
CancellationToken cancellationToken,
string clientId,
MqttSessionStatus session,
MqttTopicFilter topicFilter)
{
CancellationToken = cancellationToken;
ClientId = clientId;
Session = session;
TopicFilter = topicFilter;
}
/// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary>
public CancellationToken CancellationToken { get; internal set; }
public CancellationToken CancellationToken { get; }
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; internal set; }
public string ClientId { get; }
/// <summary>
/// Gets or sets whether the broker should close the client connection.
......@@ -48,12 +60,12 @@ namespace MQTTnet.Server
/// <summary>
/// Gets the current client session.
/// </summary>
public MqttSessionStatus Session { get; internal set; }
public MqttSessionStatus Session { get; }
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary SessionItems { get; internal set; }
public IDictionary SessionItems => Session.Items;
/// <summary>
/// Gets or sets the topic filter.
......
......@@ -10,16 +10,24 @@ namespace MQTTnet.Server
{
public sealed class InterceptingUnsubscriptionEventArgs : EventArgs
{
public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken, string clientId, IDictionary sessionItems, string topic)
{
CancellationToken = cancellationToken;
ClientId = clientId;
SessionItems = sessionItems;
Topic = topic;
}
/// <summary>
/// Gets the cancellation token which can indicate that the client connection gets down.
/// </summary>
public CancellationToken CancellationToken { get; internal set; }
public CancellationToken CancellationToken { get; }
/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; internal set; }
public string ClientId { get; }
/// <summary>
/// Gets or sets whether the broker should close the client connection.
......@@ -41,7 +49,7 @@ namespace MQTTnet.Server
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary SessionItems { get; internal set; }
public IDictionary SessionItems { get; }
/// <summary>
/// Gets or sets the MQTT topic.
......@@ -50,6 +58,6 @@ namespace MQTTnet.Server
/// The topic consists of one or more topic levels. Each topic level is separated by a forward slash (topic level
/// separator).
/// </summary>
public string Topic { get; internal set; }
public string Topic { get; }
}
}
\ No newline at end of file
......@@ -9,10 +9,17 @@ namespace MQTTnet.Server
{
public sealed class RetainedMessageChangedEventArgs : EventArgs
{
public string ClientId { get; internal set; }
public MqttApplicationMessage ChangedRetainedMessage { get; internal set; }
public List<MqttApplicationMessage> StoredRetainedMessages { get; internal set; }
public RetainedMessageChangedEventArgs(string clientId, MqttApplicationMessage changedRetainedMessage, List<MqttApplicationMessage> storedRetainedMessages)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
ChangedRetainedMessage = changedRetainedMessage ?? throw new ArgumentNullException(nameof(changedRetainedMessage));
StoredRetainedMessages = storedRetainedMessages ?? throw new ArgumentNullException(nameof(storedRetainedMessages));
}
public MqttApplicationMessage ChangedRetainedMessage { get; }
public string ClientId { get; }
public List<MqttApplicationMessage> StoredRetainedMessages { get; }
}
}
\ No newline at end of file
......@@ -3,14 +3,26 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections;
namespace MQTTnet.Server
{
public sealed class SessionDeletedEventArgs : EventArgs
{
public SessionDeletedEventArgs(string id, IDictionary sessionItems)
{
Id = id ?? throw new ArgumentNullException(nameof(id));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
/// <summary>
/// Gets the ID of the session.
/// </summary>
public string Id { get; }
/// <summary>
/// Gets the ID of the session.
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public string Id { get; internal set; }
public IDictionary SessionItems { get; }
}
}
\ No newline at end of file
......@@ -160,15 +160,8 @@ namespace MQTTnet.Server
{
return packet;
}
var interceptingPacketEventArgs = new InterceptingPacketEventArgs
{
ClientId = Id,
Endpoint = Endpoint,
Packet = packet,
CancellationToken = cancellationToken
};
var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items);
await _eventContainer.InterceptingOutboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);
if (!interceptingPacketEventArgs.ProcessPacket || packet == null)
......@@ -200,14 +193,7 @@ namespace MQTTnet.Server
if (_eventContainer.InterceptingInboundPacketEvent.HasHandlers)
{
var interceptingPacketEventArgs = new InterceptingPacketEventArgs
{
ClientId = Id,
Endpoint = Endpoint,
Packet = packet,
CancellationToken = cancellationToken
};
var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items);
await _eventContainer.InterceptingInboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);
packet = interceptingPacketEventArgs.Packet;
processPacket = interceptingPacketEventArgs.ProcessPacket;
......@@ -428,16 +414,7 @@ namespace MQTTnet.Server
if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{
interceptingPublishEventArgs = new InterceptingPublishEventArgs
{
ClientId = Id,
ApplicationMessage = applicationMessage,
SessionItems = Session.Items,
ProcessPublish = true,
CloseConnection = false,
CancellationToken = cancellationToken
};
interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, Id, Session.Items);
if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic))
{
// This can happen if a topic alias us used but the topic is
......
......@@ -109,13 +109,9 @@ namespace MQTTnet.Server
try
{
if (_eventContainer.SessionDeletedEvent.HasHandlers)
if (_eventContainer.SessionDeletedEvent.HasHandlers && session != null)
{
var eventArgs = new SessionDeletedEventArgs
{
Id = session?.Id
};
var eventArgs = new SessionDeletedEventArgs(clientId, session.Items);
await _eventContainer.SessionDeletedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
}
}
......@@ -295,13 +291,12 @@ namespace MQTTnet.Server
if (_eventContainer.ClientConnectedEvent.HasHandlers)
{
var eventArgs = new ClientConnectedEventArgs
{
ClientId = connectPacket.ClientId,
UserName = connectPacket.Username,
ProtocolVersion = channelAdapter.PacketFormatterAdapter.ProtocolVersion,
Endpoint = channelAdapter.Endpoint
};
var eventArgs = new ClientConnectedEventArgs(
connectPacket.ClientId,
connectPacket.Username,
channelAdapter.PacketFormatterAdapter.ProtocolVersion,
channelAdapter.Endpoint,
client.Session.Items);
await _eventContainer.ClientConnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
......@@ -340,12 +335,8 @@ namespace MQTTnet.Server
if (client.Id != null && !client.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers)
{
var eventArgs = new ClientDisconnectedEventArgs
{
ClientId = client.Id,
DisconnectType = client.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean,
Endpoint = endpoint
};
var disconnectType = client.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean;
var eventArgs = new ClientDisconnectedEventArgs(client.Id, disconnectType, endpoint, client.Session.Items);
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
......@@ -533,13 +524,7 @@ namespace MQTTnet.Server
if (_eventContainer.ClientConnectedEvent.HasHandlers)
{
var eventArgs = new ClientDisconnectedEventArgs
{
ClientId = existing.Id,
DisconnectType = MqttClientDisconnectType.Takeover,
Endpoint = existing.Endpoint
};
var eventArgs = new ClientDisconnectedEventArgs(existing.Id, MqttClientDisconnectType.Takeover, existing.Endpoint, existing.Session.Items);
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
......@@ -572,12 +557,7 @@ namespace MQTTnet.Server
return;
}
var eventArgs = new ApplicationMessageNotConsumedEventArgs
{
ApplicationMessage = applicationMessage,
SenderId = senderId
};
var eventArgs = new ApplicationMessageNotConsumedEventArgs(applicationMessage, senderId);
await _eventContainer.ApplicationMessageNotConsumedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
......
......@@ -217,12 +217,7 @@ namespace MQTTnet.Server
{
foreach (var finalTopicFilter in finalTopicFilters)
{
var eventArgs = new ClientSubscribedTopicEventArgs
{
ClientId = _session.Id,
TopicFilter = finalTopicFilter
};
var eventArgs = new ClientSubscribedTopicEventArgs(_session.Id, finalTopicFilter, _session.Items);
await _eventContainer.ClientSubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
......@@ -312,12 +307,7 @@ namespace MQTTnet.Server
{
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
var eventArgs = new ClientUnsubscribedTopicEventArgs
{
ClientId = _session.Id,
TopicFilter = topicFilter
};
var eventArgs = new ClientUnsubscribedTopicEventArgs(_session.Id, topicFilter, _session.Items);
await _eventContainer.ClientUnsubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
......@@ -472,14 +462,7 @@ namespace MQTTnet.Server
async Task<InterceptingSubscriptionEventArgs> InterceptSubscribe(MqttTopicFilter topicFilter, CancellationToken cancellationToken)
{
var eventArgs = new InterceptingSubscriptionEventArgs
{
ClientId = _session.Id,
TopicFilter = topicFilter,
SessionItems = _session.Items,
Session = new MqttSessionStatus(_session),
CancellationToken = cancellationToken
};
var eventArgs = new InterceptingSubscriptionEventArgs(cancellationToken, _session.Id, new MqttSessionStatus(_session), topicFilter);
if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
......@@ -508,13 +491,7 @@ namespace MQTTnet.Server
async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(string topicFilter, MqttSubscription mqttSubscription, CancellationToken cancellationToken)
{
var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs
{
ClientId = _session.Id,
Topic = topicFilter,
SessionItems = _session.Items,
CancellationToken = cancellationToken
};
var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs(cancellationToken, topicFilter, _session.Items, topicFilter);
if (mqttSubscription == null)
{
......
......@@ -105,13 +105,7 @@ namespace MQTTnet.Server
{
using (await _storageAccessLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
var eventArgs = new RetainedMessageChangedEventArgs
{
ClientId = clientId,
ChangedRetainedMessage = applicationMessage,
StoredRetainedMessages = messagesForSave
};
var eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave);
await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册