提交 45d8d04c 编写于 作者: C chengxiangwang

add logic of persist consumeOffset;polish logic of PUBLISH messages to clients(consumers)

上级 25ba6fab
......@@ -52,6 +52,8 @@ public class MqttConfig {
private long houseKeepingInterval = 10 * 1000;
private long persistOffsetInterval = 2 * 1000;
public int getListenPort() {
return listenPort;
}
......@@ -139,4 +141,12 @@ public class MqttConfig {
public void setHouseKeepingInterval(long houseKeepingInterval) {
this.houseKeepingInterval = houseKeepingInterval;
}
public long getPersistOffsetInterval() {
return persistOffsetInterval;
}
public void setPersistOffsetInterval(long persistOffsetInterval) {
this.persistOffsetInterval = persistOffsetInterval;
}
}
......@@ -103,5 +103,14 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
......@@ -21,11 +21,13 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManagerImpl;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -41,6 +43,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
1024);
private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
private final Map<String/*snode ip*/, MqttClient> snode2MqttClient = new HashMap<>();
private final ConcurrentHashMap<String /*broker*/, ConcurrentHashMap<String /*topic@clientId*/, TreeMap<Long/*queueOffset*/, MessageExt>>> processTable = new ConcurrentHashMap<>();
public IOTClientManagerImpl() {
}
......@@ -79,6 +83,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
}
public void cleanSessionState(String clientId) {
if (clientId2Subscription.remove(clientId) == null) {
return;
}
Map<String, Set<Client>> toBeRemoveFromPersistentStore = new HashMap<>();
for (Iterator<Map.Entry<String, Set<Client>>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, Set<Client>> next = iterator.next();
......@@ -94,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
}
}
//TODO update persistent store base on toBeRemoveFromPersistentStore
clientId2Subscription.remove(clientId);
//TODO update persistent store
//TODO remove offline messages
}
......@@ -118,4 +125,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public Map<String, MqttClient> getSnode2MqttClient() {
return snode2MqttClient;
}
public ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> getProcessTable() {
return processTable;
}
}
......@@ -16,16 +16,47 @@
*/
package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf;
import org.apache.rocketmq.common.protocol.route.BrokerData;
public class InFlightMessage {
final String topic;
final Integer pushQos;
final ByteBuf payload;
private final String topic;
private final Integer pushQos;
private final BrokerData brokerData;
private final byte[] body;
private final String messageId;
private final long queueOffset;
InFlightMessage(String topic, Integer pushQos, ByteBuf payload) {
InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId,
long queueOffset) {
this.topic = topic;
this.pushQos = pushQos;
this.payload = payload;
this.body = body;
this.brokerData = brokerData;
this.messageId = messageId;
this.queueOffset = queueOffset;
}
public String getTopic() {
return topic;
}
public BrokerData getBrokerData() {
return brokerData;
}
public String getMessageId() {
return messageId;
}
public long getQueueOffset() {
return queueOffset;
}
public Integer getPushQos() {
return pushQos;
}
public byte[] getBody() {
return body;
}
}
......@@ -16,32 +16,47 @@
*/
package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.exception.MqttRuntimeException;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MQTTSession extends Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private boolean cleanSession;
private boolean isConnected;
private boolean willFlag;
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final AtomicInteger inflightSlots = new AtomicInteger(10);
private final Map<Integer, InFlightMessage> inflightWindow = new HashMap<>();
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
private final AtomicInteger lastPacketId = new AtomicInteger(0);
private Hashtable inUsePacketIds = new Hashtable();
private int nextPacketId = 0;
......@@ -74,11 +89,13 @@ public class MQTTSession extends Client {
}
}
public MQTTSession(String clientId, ClientRole clientRole, Set<String> groups, boolean isConnected, boolean cleanSession,
RemotingChannel remotingChannel, long lastUpdateTimestamp) {
public MQTTSession(String clientId, ClientRole clientRole, Set<String> groups, boolean isConnected,
boolean cleanSession, RemotingChannel remotingChannel, long lastUpdateTimestamp,
DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
super(clientId, clientRole, groups, remotingChannel, lastUpdateTimestamp);
this.isConnected = isConnected;
this.cleanSession = cleanSession;
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
}
@Override
......@@ -117,21 +134,88 @@ public class MQTTSession extends Client {
this.willFlag = willFlag;
}
public void pushMessageAtQos(MqttHeader mqttHeader, ByteBuf payload,
DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
public void pushMessageQos0(MqttHeader mqttHeader, byte[] body) {
pushMessage2Client(mqttHeader, body);
}
if (mqttHeader.getQosLevel() > 0) {
public void pushMessageQos1(MqttHeader mqttHeader, MessageExt messageExt, BrokerData brokerData) {
if (inflightSlots.get() > 0) {
inflightSlots.decrementAndGet();
mqttHeader.setPacketId(getNextPacketId());
inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), payload));
inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getMsgId(), messageExt.getQueueOffset()));
inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS));
put2processTable(((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt);
pushMessage2Client(mqttHeader, messageExt.getBody());
}
defaultMqttMessageProcessor.getMqttPushService().pushMessageQos(mqttHeader, payload, this);
}
public void pubAckReceived(int ackPacketId) {
inflightWindow.remove(ackPacketId);
public InFlightMessage pubAckReceived(int ackPacketId) {
InFlightMessage remove = inflightWindow.remove(ackPacketId);
String rootTopic = MqttUtil.getRootTopic(remove.getTopic());
ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable();
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map = processTable.get(remove.getBrokerData().getBrokerName());
if (map != null) {
TreeMap<Long, MessageExt> treeMap = map.get(rootTopic + "@" + this.getClientId());
if (treeMap != null) {
treeMap.remove(remove.getQueueOffset());
}
}
inflightSlots.incrementAndGet();
releasePacketId(ackPacketId);
return remove;
}
private void pushMessage2Client(MqttHeader mqttHeader, byte[] body) {
try {
//set remaining length
int remainingLength = mqttHeader.getTopicName().getBytes().length + body.length;
if (mqttHeader.getQosLevel() > 0) {
remainingLength += 2; //add packetId length
}
mqttHeader.setRemainingLength(remainingLength);
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
RemotingChannel remotingChannel = this.getRemotingChannel();
if (this.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) this.getRemotingChannel()).getChannelHandlerContext().channel());
}
requestCommand.setBody(body);
this.defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message. Topic: {}, clientId:{}, exception={}", mqttHeader.getTopicName(), this.getClientId(), ex.getMessage());
}
}
private void put2processTable(
ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable,
String brokerName,
String rootTopic,
MessageExt messageExt) {
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map;
TreeMap<Long, MessageExt> treeMap;
String offsetKey = rootTopic + "@" + this.getClientId();
if (processTable.contains(brokerName)) {
map = processTable.get(brokerName);
if (map.contains(offsetKey)) {
treeMap = map.get(offsetKey);
treeMap.putIfAbsent(messageExt.getQueueOffset(), messageExt);
} else {
treeMap = new TreeMap<>();
treeMap.put(messageExt.getQueueOffset(), messageExt);
map.putIfAbsent(offsetKey, treeMap);
}
} else {
map = new ConcurrentHashMap<>();
treeMap = new TreeMap<>();
treeMap.put(messageExt.getQueueOffset(), messageExt);
map.put(offsetKey, treeMap);
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> old = processTable.putIfAbsent(brokerName, map);
if (old != null) {
old.putIfAbsent(offsetKey, treeMap);
}
}
}
private synchronized void releasePacketId(int msgId) {
this.inUsePacketIds.remove(new Integer(msgId));
}
......@@ -159,4 +243,13 @@ public class MQTTSession extends Client {
this.inUsePacketIds.put(id, id);
return this.nextPacketId;
}
public AtomicInteger getInflightSlots() {
return inflightSlots;
}
public Map<Integer, InFlightMessage> getInflightWindow() {
return inflightWindow;
}
}
......@@ -20,46 +20,79 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.transfer.TransferDataQos1;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttMessageForwarder implements MessageHandler {
public class MqttMessageForwardHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final InnerMqttMessageProcessor innerMqttMessageProcessor;
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttMessageForwarder(InnerMqttMessageProcessor processor) {
public MqttMessageForwardHandler(InnerMqttMessageProcessor processor) {
this.innerMqttMessageProcessor = processor;
this.defaultMqttMessageProcessor = innerMqttMessageProcessor.getDefaultMqttMessageProcessor();
}
/**
* handle PUBLISH message from client
* handle messages transferred from other nodes
*
* @param message
* @return whether the message is handled successfully
* @param message the message that transferred from other node
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
ByteBuf payload = mqttPublishMessage.payload();
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
ByteBuf payload = mqttPublishMessage.payload();
//Publish message to clients
Set<Client> clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.innerMqttMessageProcessor.getIotClientManager());
innerMqttMessageProcessor.getDefaultMqttMessageProcessor().getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload, clientsTobePublish);
}else if(fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)){
//TODO
MqttHeader mqttHeaderQos0 = new MqttHeader();
mqttHeaderQos0.setTopicName(variableHeader.topicName());
mqttHeaderQos0.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos0.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later.
Set<Client> clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager());
for (Client client : clientsTobePublish) {
((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor);
}
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
TransferDataQos1 transferDataQos1 = TransferDataQos1.decode(body, TransferDataQos1.class);
//TODO : find clients that subscribed this topic from current node
List<Client> clientsTobePublished = new ArrayList<>();
for (Client client : clientsTobePublished) {
//For each client, wrap a task:
//Pull message one by one, and push them if current client match.
MqttHeader mqttHeaderQos1 = new MqttHeader();
mqttHeaderQos1.setTopicName(variableHeader.topicName());
mqttHeaderQos1.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos1.setRetain(false); //TODO set to false temporarily, need to be implemented later.
MqttPushTask mqttPushTask = new MqttPushTask(this.defaultMqttMessageProcessor, mqttHeaderQos1, client, transferDataQos1.getBrokerData());
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask));
}
return doResponse(fixedHeader);
}
return doResponse(fixedHeader);
return null;
}
}
......@@ -18,13 +18,23 @@
package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.InFlightMessage;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPubackMessageHandler implements MessageHandler {
......@@ -43,6 +53,24 @@ public class MqttPubackMessageHandler implements MessageHandler {
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
if (!(message instanceof MqttPubAckMessage)) {
log.error("Wrong message type! Expected type: PUBACK but {} was received. MqttMessage={}", message.fixedHeader().messageType(), message.toString());
throw new WrongMessageTypeException("Wrong message type exception.");
}
MqttPubAckMessage mqttPubAckMessage = (MqttPubAckMessage) message;
MqttMessageIdVariableHeader variableHeader = mqttPubAckMessage.variableHeader();
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
MQTTSession client = (MQTTSession) iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
InFlightMessage removedMessage = client.pubAckReceived(variableHeader.messageId());
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setTopicName(removedMessage.getTopic());
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeader.setDup(false);
mqttHeader.setRetain(false); //TODO set to false temporarily, need to be implemented.
MqttPushTask task = new MqttPushTask(defaultMqttMessageProcessor, mqttHeader, client, removedMessage.getBrokerData());
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(task));
return null;
}
}
......@@ -23,36 +23,26 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.service.EnodeService;
......@@ -61,12 +51,14 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.exception.MqttRuntimeException;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -106,19 +98,20 @@ public class MqttPublishMessageHandler implements MessageHandler {
}
ByteBuf payload = mqttPublishMessage.payload();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setTopicName(variableHeader.topicName());
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(fixedHeader.qosLevel().value());
mqttHeader.setRetain(false); //set to false tempararily, need to be implemented.
switch (fixedHeader.qosLevel()) {
case AT_MOST_ONCE:
//For clients connected to the current snode and isConnected is true
Set<Client> clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), this.iotClientManager);
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
MqttHeader mqttHeaderQos0 = new MqttHeader();
mqttHeaderQos0.setTopicName(variableHeader.topicName());
mqttHeaderQos0.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos0.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later.
for (Client client : clientsTobePublish) {
((MQTTSession) client).pushMessageAtQos(mqttHeader, payload, this.defaultMqttMessageProcessor);
((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor);
}
//For clients that connected to other snodes, transfer the message to them
......@@ -134,6 +127,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
} finally {
ReferenceCountUtil.release(message);
}
case AT_LEAST_ONCE:
// Store msg and invoke callback to publish msg to subscribers
// 1. Check if the root topic has been created
......@@ -156,95 +150,30 @@ public class MqttPublishMessageHandler implements MessageHandler {
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
//publish msg to subscribers
try {
SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) data.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//find clients that subscribed this topic from all snodes and put it to map.
Map<String, List<Client>> snodeAddr2Clients = new HashMap<>();
//for clientIds connected to current snode, trigger the logic of push message
List<Client> clients = snodeAddr2Clients.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1());
for (Client client : clients) {
Subscription subscription = this.iotClientManager.getSubscriptionByClientId(client.getClientId());
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = subscription.getSubscriptionTable();
//for each client, wrap a task: pull messages from commitlog one by one, and push them if current client subscribe it.
Runnable task = new Runnable() {
@Override
public void run() {
//compare current consumeOffset of rootTopic@clientId with maxOffset, pull message if consumeOffset < maxOffset
long maxOffsetInQueue;
try {
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0);
long i = consumeOffset;
while (i < maxOffsetInQueue) {
//TODO query messages from enode
RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
boolean needSkip = needSkip(realTopic);
if (needSkip) {
log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic);
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1;
continue;
}
Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)));
mqttHeader.setQosLevel(pushQos);
//push message
MQTTSession mqttSession = (MQTTSession) client;
mqttSession.pushMessageAtQos(mqttHeader, payload, defaultMqttMessageProcessor);
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1;
}
} catch (Exception ex) {
log.error("Get max offset error, remoting: {} error: {} ", remotingChannel.remoteAddress(), ex);
}
}
private boolean needSkip(final String realTopic) {
Enumeration<String> topicFilters = subscriptionTable.keys();
while (topicFilters.hasMoreElements()) {
if (MqttUtil.isMatch(topicFilters.nextElement(), realTopic)) {
return false;
}
}
return true;
}
private Integer lowerQosToTheSubscriptionDesired(String publishTopic,
Integer publishingQos) {
Integer pushQos = Integer.valueOf(publishingQos);
Iterator<Map.Entry<String, SubscriptionData>> iterator = subscriptionTable.entrySet().iterator();
Integer maxRequestedQos = 0;
while (iterator.hasNext()) {
final String topicFilter = iterator.next().getKey();
if (MqttUtil.isMatch(topicFilter, publishTopic)) {
MqttSubscriptionData mqttSubscriptionData = (MqttSubscriptionData) iterator.next().getValue();
maxRequestedQos = mqttSubscriptionData.getQos() > maxRequestedQos ? mqttSubscriptionData.getQos() : maxRequestedQos;
}
}
if (publishingQos > maxRequestedQos) {
pushQos = maxRequestedQos;
}
return pushQos;
}
};
}
//for clientIds connected to other snodes, forward msg
} catch (RemotingCommandException e) {
e.printStackTrace();
//TODO find clients that subscribed this topic from all snodes and put it to map.
Map<String, List<Client>> snodeAddr2Clients = new HashMap<>();
//for clientIds connected to current snode, trigger the logic of push message
List<Client> clients = snodeAddr2Clients.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1());
for (Client client : clients) {
//For each client, wrap a task:
//Pull message one by one, and push them if current client match.
MqttHeader mqttHeaderQos1 = new MqttHeader();
mqttHeaderQos1.setTopicName(variableHeader.topicName());
mqttHeaderQos1.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos1.setRetain(false); //TODO set to false temporarily, need to be implemented later.
MqttPushTask mqttPushTask = new MqttPushTask(defaultMqttMessageProcessor, mqttHeaderQos1, client, brokerData);
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask));
}
//TODO for clientIds connected to other snodes, forward msg
} else {
log.error("Store Qos=1 Message error: {}", ex);
}
});
case EXACTLY_ONCE:
throw new MqttRuntimeException("Qos = 2 messages are not supported yet.");
}
return doResponse(fixedHeader);
}
......@@ -297,13 +226,4 @@ public class MqttPublishMessageHandler implements MessageHandler {
return request;
}
private long getMaxOffset(String enodeName,
String topic) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(0);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
return this.defaultMqttMessageProcessor.getEnodeService().getMaxOffsetInQueue(enodeName, topic, 0, request);
}
}
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.exception.MQClientException;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.common.service.ScheduledService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
......@@ -55,8 +56,9 @@ import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.mqtt.service.impl.MqttScheduledServiceImpl;
import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
import org.apache.rocketmq.mqtt.util.orderedexecutor.OrderedExecutor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
......@@ -74,7 +76,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService;
private ClientManager iotClientManager;
private RemotingServer mqttRemotingServer;
private MqttClientHousekeepingService mqttClientHousekeepingService;
......@@ -82,6 +83,9 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
private SnodeConfig snodeConfig;
private EnodeService enodeService;
private NnodeService nnodeService;
private ScheduledService mqttScheduledService;
private final OrderedExecutor orderedExecutor;
public DefaultMqttMessageProcessor(MqttConfig mqttConfig, SnodeConfig snodeConfig,
RemotingServer mqttRemotingServer,
......@@ -89,7 +93,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
this.mqttConfig = mqttConfig;
this.snodeConfig = snodeConfig;
this.willMessageService = new WillMessageServiceImpl();
this.mqttPushService = new MqttPushServiceImpl(this, mqttConfig);
this.iotClientManager = new IOTClientManagerImpl();
this.mqttRemotingServer = mqttRemotingServer;
this.enodeService = enodeService;
......@@ -97,6 +100,10 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager);
this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval());
this.orderedExecutor = OrderedExecutor.newBuilder().name("PushMessageToConsumerThreads").numThreads(mqttConfig.getPushMqttMessageMaxPoolSize()).build();
this.mqttScheduledService = new MqttScheduledServiceImpl(this);
mqttScheduledService.startScheduleTask();
registerMessageHandler(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this));
registerMessageHandler(MqttMessageType.DISCONNECT,
......@@ -164,10 +171,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
return willMessageService;
}
public MqttPushServiceImpl getMqttPushService() {
return mqttPushService;
}
public ClientManager getIotClientManager() {
return iotClientManager;
}
......@@ -207,4 +210,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
public void setNnodeService(NnodeService nnodeService) {
this.nnodeService = nnodeService;
}
public OrderedExecutor getOrderedExecutor() {
return orderedExecutor;
}
}
......@@ -33,9 +33,8 @@ import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttMessageForwarder;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttMessageForwardHandler;
import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
......@@ -52,24 +51,22 @@ public class InnerMqttMessageProcessor implements RequestProcessor {
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService;
private ClientManager iotClientManager;
private RemotingServer innerMqttRemotingServer;
private MqttConfig mqttConfig;
private SnodeConfig snodeConfig;
private EnodeService enodeService;
private NnodeService nnodeService;
private MqttMessageForwarder mqttMessageForwarder;
private MqttMessageForwardHandler mqttMessageForwardHandler;
public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, RemotingServer innerMqttRemotingServer) {
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService();
this.mqttPushService = this.defaultMqttMessageProcessor.getMqttPushService();
this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager();
this.innerMqttRemotingServer = innerMqttRemotingServer;
this.enodeService = this.defaultMqttMessageProcessor.getEnodeService();
this.nnodeService = this.defaultMqttMessageProcessor.getNnodeService();
this.mqttMessageForwarder = new MqttMessageForwarder(this);
this.mqttMessageForwardHandler = new MqttMessageForwardHandler(this);
}
@Override
......@@ -82,7 +79,7 @@ public class InnerMqttMessageProcessor implements RequestProcessor {
mqttHeader.getRemainingLength());
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
return mqttMessageForwarder.handleMessage(mqttMessage, remotingChannel);
return mqttMessageForwardHandler.handleMessage(mqttMessage, remotingChannel);
}else{
return defaultMqttMessageProcessor.processRequest(remotingChannel, message);
}
......@@ -98,10 +95,6 @@ public class InnerMqttMessageProcessor implements RequestProcessor {
return willMessageService;
}
public MqttPushServiceImpl getMqttPushService() {
return mqttPushService;
}
public ClientManager getIotClientManager() {
return iotClientManager;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.service.impl;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.service.MqttPushService;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPushServiceImpl implements MqttPushService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private ExecutorService pushMqttMessageExecutorService;
private static DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) {
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
mqttConfig.getPushMqttMessageMinPoolSize(),
mqttConfig.getPushMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(mqttConfig.getPushMqttMessageThreadPoolQueueCapacity()),
"pushMqttMessageThread",
false);
}
public static class MqttPushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final ByteBuf message;
private final MqttHeader mqttHeader;
private Client client;
// private final String topic;
// private final Integer qos;
// private boolean retain;
// private Integer packetId;
public MqttPushTask(final MqttHeader mqttHeader, final ByteBuf message, Client client) {
this.message = message;
this.mqttHeader = mqttHeader;
// this.topic = topic;
// this.qos = qos;
// this.retain = retain;
// this.packetId = packetId;
this.client = client;
}
@Override
public void run() {
if (!canceled.get()) {
try {
RemotingCommand requestCommand = buildRequestCommand(this.mqttHeader);
RemotingChannel remotingChannel = client.getRemotingChannel();
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
byte[] body = new byte[message.readableBytes()];
message.readBytes(body);
requestCommand.setBody(body);
defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, clientId:{}, exception={}", mqttHeader.getTopicName(), client.getClientId(), ex.getMessage());
} finally {
ReferenceCountUtil.release(message);
}
} else {
log.info("Push message to topic: {}, clientId:{}, canceled!", mqttHeader.getTopicName(), client.getClientId());
}
}
private RemotingCommand buildRequestCommand(MqttHeader mqttHeader) {
// if (qos == 0) {
// mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
// } else {
// mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
// }
// mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
return pushMessage;
}
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
}
public void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client) {
MqttPushTask pushTask = new MqttPushTask(mqttHeader, message, client);
pushMqttMessageExecutorService.submit(pushTask);
}
public void shutdown() {
this.pushMqttMessageExecutorService.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.service.impl;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.service.ScheduledService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
public class MqttScheduledServiceImpl implements ScheduledService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttScheduledServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
}
private final ScheduledExecutorService mqttScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MqttScheduledThread");
}
});
@Override
public void startScheduleTask() {
this.mqttScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = iotClientManager.getProcessTable();
for (Map.Entry<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> entry : processTable.entrySet()) {
String brokerName = entry.getKey();
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map = entry.getValue();
for (Map.Entry<String, TreeMap<Long, MessageExt>> innerEntry : map.entrySet()) {
String topicClient = innerEntry.getKey();
TreeMap<Long, MessageExt> inflightMessages = innerEntry.getValue();
Long offset = inflightMessages.firstKey();
defaultMqttMessageProcessor.getEnodeService().persistOffset(null, brokerName, topicClient.split("@")[1], topicClient.split("@")[0], 0, offset);
}
}
}
}, 0, defaultMqttMessageProcessor.getMqttConfig().getPersistOffsetInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
if (this.mqttScheduledExecutorService != null) {
this.mqttScheduledExecutorService.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.task;
import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPushTask implements Runnable {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private MqttHeader mqttHeader;
private MQTTSession client;
private BrokerData brokerData;
public MqttPushTask(DefaultMqttMessageProcessor processor, final MqttHeader mqttHeader, Client client,
BrokerData brokerData) {
this.defaultMqttMessageProcessor = processor;
this.mqttHeader = mqttHeader;
this.client = (MQTTSession) client;
this.brokerData = brokerData;
}
@Override
public void run() {
String rootTopic = MqttUtil.getRootTopic(mqttHeader.getTopicName());
EnodeService enodeService = this.defaultMqttMessageProcessor.getEnodeService();
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager();
Subscription subscription = iotClientManager.getSubscriptionByClientId(client.getClientId());
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = subscription.getSubscriptionTable();
//compare current consumeOffset of rootTopic@clientId with maxOffset, pull message if consumeOffset < maxOffset
long maxOffsetInQueue;
try {
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
final long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0);
long i = consumeOffset + 1;
while (i <= maxOffsetInQueue) {
//TODO query messages(queueOffset=i) from enode above(brokerData.getBrokerName)
RemotingCommand response = null;
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
boolean needSkip = needSkip(realTopic, subscriptionTable);
boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset());
if (needSkip) {
log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic);
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1;
continue;
}
if (alreadyInFlight) {
log.info("The message is already inflight. MessageId={}", messageExt.getMsgId());
break;
}
Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable);
mqttHeader.setQosLevel(pushQos);
mqttHeader.setTopicName(realTopic);
if (client.getInflightSlots().get() == 0) {
log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic);
break;
}
//push message if in-flight window has slot(not full)
client.pushMessageQos1(mqttHeader, messageExt, brokerData);
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1;
}
//TODO update consumeOffset of rootTopic@clientId in brokerData.getBrokerName()
enodeService.persistOffset(null, brokerData.getBrokerName(), client.getClientId(), rootTopic, 0, i - 1);
} catch (Exception ex) {
log.error("Exception was thrown when pushing messages to consumer.{}", ex);
}
}
private boolean needSkip(final String realTopic, ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
Enumeration<String> topicFilters = subscriptionTable.keys();
while (topicFilters.hasMoreElements()) {
if (MqttUtil.isMatch(topicFilters.nextElement(), realTopic)) {
return false;
}
}
return true;
}
private boolean alreadyInFight(String brokerName, String topic, String clientId, Long queueOffset) {
ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable();
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map = processTable.get(brokerName);
if (map != null) {
TreeMap<Long, MessageExt> treeMap = map.get(MqttUtil.getRootTopic(topic) + "@" + clientId);
if (treeMap != null && treeMap.get(queueOffset) != null) {
return true;
}
}
return false;
}
private Integer lowerQosToTheSubscriptionDesired(String publishTopic, Integer publishingQos,
ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
Integer pushQos = Integer.valueOf(publishingQos);
Iterator<Map.Entry<String, SubscriptionData>> iterator = subscriptionTable.entrySet().iterator();
Integer maxRequestedQos = 0;
while (iterator.hasNext()) {
final String topicFilter = iterator.next().getKey();
if (MqttUtil.isMatch(topicFilter, publishTopic)) {
MqttSubscriptionData mqttSubscriptionData = (MqttSubscriptionData) iterator.next().getValue();
maxRequestedQos = mqttSubscriptionData.getQos() > maxRequestedQos ? mqttSubscriptionData.getQos() : maxRequestedQos;
}
}
if (publishingQos > maxRequestedQos) {
pushQos = maxRequestedQos;
}
return pushQos;
}
private long getMaxOffset(String enodeName,
String topic) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(0);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
return this.defaultMqttMessageProcessor.getEnodeService().getMaxOffsetInQueue(enodeName, topic, 0, request);
}
}
......@@ -14,12 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.service;
import io.netty.buffer.ByteBuf;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public interface MqttPushService {
void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client);
package org.apache.rocketmq.mqtt.transfer;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TransferDataQos1 extends RemotingSerializable {
private BrokerData brokerData;
private String topic;
public BrokerData getBrokerData() {
return brokerData;
}
public void setBrokerData(BrokerData brokerData) {
this.brokerData = brokerData;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import com.google.common.util.concurrent.ForwardingExecutorService;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Implements {@link ExecutorService} and allows limiting the number of tasks to
* be scheduled in the thread's queue.
*/
public class BoundedExecutorService extends ForwardingExecutorService {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor thread;
private final int maxTasksInQueue;
public BoundedExecutorService(ThreadPoolExecutor thread, int maxTasksInQueue) {
this.queue = thread.getQueue();
this.thread = thread;
this.maxTasksInQueue = maxTasksInQueue;
}
@Override
protected ExecutorService delegate() {
return this.thread;
}
private void checkQueue(int numberOfTasks) {
if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
}
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
checkQueue(tasks.size());
return super.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
checkQueue(tasks.size());
return super.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
checkQueue(tasks.size());
return super.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
checkQueue(tasks.size());
return super.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
checkQueue(1);
super.execute(command);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
checkQueue(1);
return super.submit(task);
}
@Override
public Future<?> submit(Runnable task) {
checkQueue(1);
return super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
checkQueue(1);
return super.submit(task, result);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
/**
* Simple stats that require only increment and decrement
* functions on a Long. Metrics like the number of topics, persist queue size
* etc. should use this.
*/
public interface Counter {
/**
* Clear this stat.
*/
void clear();
/**
* Increment the value associated with this stat.
*/
void inc();
/**
* Decrement the value associated with this stat.
*/
void dec();
/**
* Add delta to the value associated with this stat.
* @param delta
*/
void add(long delta);
/**
* Get the value associated with this stat.
*/
Long get();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
/**
* A guage is a value that has only one value at a specific point in time.
* An example is the number of elements in a queue. The value of T must be
* some numeric type.
*/
public interface Gauge<T extends Number> {
T getDefaultValue();
T getSample();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.concurrent.TimeUnit;
/**
* Provides misc math functions that don't come standard.
*/
public class MathUtils {
private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % divisor);
if (mod < 0) {
mod += divisor;
}
return mod;
}
public static int findNextPositivePowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
/**
* Current time from some arbitrary time base in the past, counting in
* nanoseconds, and not affected by settimeofday or similar system clock
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
*
* <p>NOTE: only use it for measuring.
* http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
*
* @return current time in nanoseconds.
*/
public static long nowInNano() {
return System.nanoTime();
}
/**
* Milliseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
public static long elapsedMSec(long startNanoTime) {
return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND;
}
/**
* Microseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
public static long elapsedMicroSec(long startNanoTime) {
return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
}
/**
* Nanoseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
public static long elapsedNanos(long startNanoTime) {
return System.nanoTime() - startNanoTime;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.Map;
import org.slf4j.MDC;
/**
* Utils for work with Slf4j MDC.
*/
public class MdcUtils {
public static void restoreContext(Map<String, String> mdcContextMap) {
if (mdcContextMap == null || mdcContextMap.isEmpty()) {
MDC.clear();
} else {
MDC.setContextMap(mdcContextMap);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.concurrent.TimeUnit;
/**
* A <i>no-op</i> {@code StatsLogger}.
*
* <p>Metrics are not recorded, making this receiver useful in unit tests and as defaults in
* situations where metrics are not strictly required.
*/
public class NullStatsLogger implements StatsLogger {
public static final NullStatsLogger INSTANCE = new NullStatsLogger();
/**
* A <i>no-op</i> {@code OpStatsLogger}.
*/
static class NullOpStatsLogger implements OpStatsLogger {
final OpStatsData nullOpStats = new OpStatsData(0, 0, 0, new long[6]);
@Override
public void registerFailedEvent(long eventLatency, TimeUnit unit) {
// nop
}
@Override
public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
// nop
}
@Override
public void registerSuccessfulValue(long value) {
// nop
}
@Override
public void registerFailedValue(long value) {
// nop
}
@Override
public OpStatsData toOpStatsData() {
return nullOpStats;
}
@Override
public void clear() {
// nop
}
}
static NullOpStatsLogger nullOpStatsLogger = new NullOpStatsLogger();
/**
* A <i>no-op</i> {@code Counter}.
*/
static class NullCounter implements Counter {
@Override
public void clear() {
// nop
}
@Override
public void inc() {
// nop
}
@Override
public void dec() {
// nop
}
@Override
public void add(long delta) {
// nop
}
@Override
public Long get() {
return 0L;
}
}
static NullCounter nullCounter = new NullCounter();
@Override
public OpStatsLogger getOpStatsLogger(String name) {
return nullOpStatsLogger;
}
@Override
public Counter getCounter(String name) {
return nullCounter;
}
@Override
public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
// nop
}
@Override
public <T extends Number> void unregisterGauge(String name, Gauge<T> gauge) {
// nop
}
@Override
public StatsLogger scope(String name) {
return this;
}
@Override
public void removeScope(String name, StatsLogger statsLogger) {
// nop
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.Arrays;
/**
* This class provides a read view of operation specific stats.
* We expose this to JMX.
* We use primitives because the class has to conform to CompositeViewData.
*/
public class OpStatsData {
private final long numSuccessfulEvents, numFailedEvents;
// All latency values are in Milliseconds.
private final double avgLatencyMillis;
// 10.0 50.0, 90.0, 99.0, 99.9, 99.99 in that order.
// TODO: Figure out if we can use a Map
private final long[] percentileLatenciesMillis;
public OpStatsData(long numSuccessfulEvents, long numFailedEvents,
double avgLatencyMillis, long[] percentileLatenciesMillis) {
this.numSuccessfulEvents = numSuccessfulEvents;
this.numFailedEvents = numFailedEvents;
this.avgLatencyMillis = avgLatencyMillis;
this.percentileLatenciesMillis =
Arrays.copyOf(percentileLatenciesMillis, percentileLatenciesMillis.length);
}
public long getP10Latency() {
return this.percentileLatenciesMillis[0];
}
public long getP50Latency() {
return this.percentileLatenciesMillis[1];
}
public long getP90Latency() {
return this.percentileLatenciesMillis[2];
}
public long getP99Latency() {
return this.percentileLatenciesMillis[3];
}
public long getP999Latency() {
return this.percentileLatenciesMillis[4];
}
public long getP9999Latency() {
return this.percentileLatenciesMillis[5];
}
public long getNumSuccessfulEvents() {
return this.numSuccessfulEvents;
}
public long getNumFailedEvents() {
return this.numFailedEvents;
}
public double getAvgLatencyMillis() {
return this.avgLatencyMillis;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.concurrent.TimeUnit;
/**
* This interface handles logging of statistics related to each operation. (PUBLISH, CONSUME etc.)
*/
public interface OpStatsLogger {
/**
* Increment the failed op counter with the given eventLatency.
* @param eventLatencyMillis The event latency
* @param unit
*/
void registerFailedEvent(long eventLatencyMillis, TimeUnit unit);
/**
* An operation succeeded with the given eventLatency. Update
* stats to reflect the same
* @param eventLatencyMillis The event latency
* @param unit
*/
void registerSuccessfulEvent(long eventLatencyMillis, TimeUnit unit);
/**
* An operation with the given value succeeded.
* @param value
*/
void registerSuccessfulValue(long value);
/**
* An operation with the given value failed.
*/
void registerFailedValue(long value);
/**
* @return Returns an OpStatsData object with necessary values. We need this function
* to support JMX exports. This should be deprecated sometime in the near future.
* populated.
*/
OpStatsData toOpStatsData();
/**
* Clear stats for this operation.
*/
void clear();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A runnable that catches runtime exceptions.
*/
@FunctionalInterface
public interface SafeRunnable extends Runnable {
Logger LOGGER = LoggerFactory.getLogger(SafeRunnable.class);
@Override
default void run() {
try {
safeRun();
} catch (Throwable t) {
LOGGER.error("Unexpected throwable caught ", t);
}
}
void safeRun();
/**
* Utility method to use SafeRunnable from lambdas.
*
* <p>Eg:
* <pre>
* <code>
* executor.submit(SafeRunnable.safeRun(() -> {
* // My not-safe code
* });
* </code>
* </pre>
*/
static SafeRunnable safeRun(Runnable runnable) {
return new SafeRunnable() {
@Override
public void safeRun() {
runnable.run();
}
};
}
/**
* Utility method to use SafeRunnable from lambdas with
* a custom exception handler.
*
* <p>Eg:
* <pre>
* <code>
* executor.submit(SafeRunnable.safeRun(() -> {
* // My not-safe code
* }, exception -> {
* // Handle exception
* );
* </code>
* </pre>
*
* @param runnable
* @param exceptionHandler
* handler that will be called when there are any exception
* @return
*/
static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> exceptionHandler) {
return () -> {
try {
runnable.run();
} catch (Throwable t) {
exceptionHandler.accept(t);
throw t;
}
};
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.rocketmq.mqtt.util.orderedexecutor;
/**
* A simple interface that exposes just 2 useful methods. One to get the logger for an Op stat
* and another to get the logger for a simple stat
*/
public interface StatsLogger {
/**
* @param name
* Stats Name
* @return Get the logger for an OpStat described by the <i>name</i>.
*/
OpStatsLogger getOpStatsLogger(String name);
/**
* @param name
* Stats Name
* @return Get the logger for a simple stat described by the <i>name</i>
*/
Counter getCounter(String name);
/**
* Register given <i>gauge</i> as name <i>name</i>.
*
* @param name
* gauge name
* @param gauge
* gauge function
*/
<T extends Number> void registerGauge(String name, Gauge<T> gauge);
/**
* Unregister given <i>gauge</i> from name <i>name</i>.
*
* @param name
* name of the gauge
* @param gauge
* gauge function
*/
<T extends Number> void unregisterGauge(String name, Gauge<T> gauge);
/**
* Provide the stats logger under scope <i>name</i>.
*
* @param name
* scope name.
* @return stats logger under scope <i>name</i>.
*/
StatsLogger scope(String name);
/**
* Remove the given <i>statsLogger</i> for scope <i>name</i>.
* It can be no-op if the underlying stats provider doesn't have the ability to remove scope.
*
* @param name name of the scope
* @param statsLogger the stats logger of this scope.
*/
void removeScope(String name, StatsLogger statsLogger);
}
......@@ -30,11 +30,11 @@ public class MqttHeader implements CommandCustomHeader {
@CFNotNull
private Integer messageType;
@CFNotNull
private boolean isDup;
private boolean isDup = false;
@CFNotNull
private Integer qosLevel;
@CFNotNull
private boolean isRetain;
private boolean isRetain = false;
@CFNotNull
private int remainingLength;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册