提交 5481c2bc 编写于 作者: C chengxiangwang

add logic of push messages when reconnected;add logic of transfer qos1 messages

上级 947d558a
......@@ -23,22 +23,31 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.exception.MqttConnectException;
import org.apache.rocketmq.mqtt.exception.MqttRuntimeException;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
......@@ -138,7 +147,8 @@ public class MqttConnectMessageHandler implements MessageHandler {
willMessage.setBody(payload.willMessageInBytes());
defaultMqttMessageProcessor.getWillMessageService().saveWillMessage(client.getClientId(), willMessage);
}
//trigger to push offline messages to this client
pushOfflineMessages(iotClientManager.getSubscriptionByClientId(client.getClientId()), client);
mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name());
command.setCode(ResponseCode.SUCCESS);
command.setRemark(null);
......@@ -157,6 +167,37 @@ public class MqttConnectMessageHandler implements MessageHandler {
return true;
}
private void pushOfflineMessages(Subscription subscription, Client client) {
Enumeration<String> keys = subscription.getSubscriptionTable().keys();
Set<String> topicFilters = new HashSet<>();
while (keys.hasMoreElements()) {
String topicFilter = keys.nextElement();
String rootTopic = MqttUtil.getRootTopic(topicFilter);
topicFilters.add(rootTopic);
}
for (String rootTopic : topicFilters) {
TopicRouteData topicRouteData;
try {
topicRouteData = this.defaultMqttMessageProcessor.getNnodeService().getTopicRouteDataByTopic(rootTopic, false);
} catch (Exception e) {
log.error("Exception was thrown when get topicRouteData. topic={}", rootTopic);
throw new MqttRuntimeException("Exception was thrown when get topicRouteData.");
}
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeader.setRetain(false); //TODO set to false temporarily, need to be implemented later.
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
for (BrokerData brokerData : brokerDatas) {
MqttPushTask mqttPushTask = new MqttPushTask(this.defaultMqttMessageProcessor, mqttHeader, rootTopic, client, brokerData);
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().submit(mqttPushTask);
}
}
}
private boolean authorized(String username, String password) {
return true;
}
......
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.transfer.TransferDataQos1;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -84,10 +85,9 @@ public class MqttMessageForwardHandler implements MessageHandler {
//For each client, wrap a task:
//Pull message one by one, and push them if current client match.
MqttHeader mqttHeaderQos1 = new MqttHeader();
mqttHeaderQos1.setTopicName(variableHeader.topicName());
mqttHeaderQos1.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos1.setRetain(false); //TODO set to false temporarily, need to be implemented later.
MqttPushTask mqttPushTask = new MqttPushTask(this.defaultMqttMessageProcessor, mqttHeaderQos1, client, transferDataQos1.getBrokerData());
MqttPushTask mqttPushTask = new MqttPushTask(this.defaultMqttMessageProcessor, mqttHeaderQos1, MqttUtil.getRootTopic(variableHeader.topicName()), client, transferDataQos1.getBrokerData());
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask));
}
......
......@@ -31,6 +31,7 @@ import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -64,11 +65,10 @@ public class MqttPubackMessageHandler implements MessageHandler {
InFlightMessage removedMessage = client.pubAckReceived(variableHeader.messageId());
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setTopicName(removedMessage.getTopic());
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeader.setDup(false);
mqttHeader.setRetain(false); //TODO set to false temporarily, need to be implemented.
MqttPushTask task = new MqttPushTask(defaultMqttMessageProcessor, mqttHeader, client, removedMessage.getBrokerData());
MqttPushTask task = new MqttPushTask(defaultMqttMessageProcessor, mqttHeader, MqttUtil.getRootTopic(removedMessage.getTopic()), client, removedMessage.getBrokerData());
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(task));
return null;
......
......@@ -56,6 +56,7 @@ import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.task.MqttPushTask;
import org.apache.rocketmq.mqtt.transfer.TransferDataQos1;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.mqtt.util.orderedexecutor.SafeRunnable;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -121,7 +122,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
//step2. get snode ips by clients in step2.
Set<String> snodeIpsTobeTransfer = new HashSet<>();
try {
transferMessage(snodeIpsTobeTransfer, variableHeader.topicName(), payload);
transferMessage(snodeIpsTobeTransfer, variableHeader.topicName(), body);
} catch (MqttException e) {
log.error("Transfer message failed: {}", e.getMessage());
} finally {
......@@ -159,14 +160,25 @@ public class MqttPublishMessageHandler implements MessageHandler {
//For each client, wrap a task:
//Pull message one by one, and push them if current client match.
MqttHeader mqttHeaderQos1 = new MqttHeader();
mqttHeaderQos1.setTopicName(variableHeader.topicName());
mqttHeaderQos1.setMessageType(MqttMessageType.PUBLISH.value());
mqttHeaderQos1.setRetain(false); //TODO set to false temporarily, need to be implemented later.
MqttPushTask mqttPushTask = new MqttPushTask(defaultMqttMessageProcessor, mqttHeaderQos1, client, brokerData);
MqttPushTask mqttPushTask = new MqttPushTask(defaultMqttMessageProcessor, mqttHeaderQos1, MqttUtil.getRootTopic(variableHeader.topicName()), client, brokerData);
//add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask));
}
//TODO for clientIds connected to other snodes, forward msg
//for clients connected to other snodes, forward msg
Set<String> snodesTobeTransfered = new HashSet<>();
TransferDataQos1 transferDataQos1 = new TransferDataQos1();
transferDataQos1.setBrokerData(brokerData);
transferDataQos1.setTopic(variableHeader.topicName());
byte[] encode = TransferDataQos1.encode(transferDataQos1);
try {
transferMessage(snodesTobeTransfered, variableHeader.topicName(), encode);
} catch (MqttException e) {
log.error("Transfer message failed: {}", e.getMessage());
} finally {
ReferenceCountUtil.release(message);
}
} else {
log.error("Store Qos=1 Message error: {}", ex);
}
......@@ -178,7 +190,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
return doResponse(fixedHeader);
}
private void transferMessage(Set<String> snodeAddresses, String topic, ByteBuf payload) throws MqttException {
private void transferMessage(Set<String> snodeAddresses, String topic, byte[] body) throws MqttException {
SnodeConfig snodeConfig = defaultMqttMessageProcessor.getSnodeConfig();
MqttConfig mqttConfig = defaultMqttMessageProcessor.getMqttConfig();
String url = "tcp://" + snodeConfig.getSnodeIP1() + ":" + (mqttConfig.getListenPort() - 1);
......@@ -196,9 +208,6 @@ public class MqttPublishMessageHandler implements MessageHandler {
client.connect(connOpts);
snode2MqttClient.put(snodeAddress, client);
}
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
org.eclipse.paho.client.mqttv3.MqttMessage message = new org.eclipse.paho.client.mqttv3.MqttMessage(body);
message.setQos(0);
client.publish(topic, message);
......
......@@ -66,46 +66,46 @@ public class MqttPushTask implements Runnable {
private MqttHeader mqttHeader;
private MQTTSession client;
private BrokerData brokerData;
private String rootTopic;
public MqttPushTask(DefaultMqttMessageProcessor processor, final MqttHeader mqttHeader, Client client,
public MqttPushTask(DefaultMqttMessageProcessor processor, final MqttHeader mqttHeader, String rootTopic, Client client,
BrokerData brokerData) {
this.defaultMqttMessageProcessor = processor;
this.mqttHeader = mqttHeader;
this.rootTopic = rootTopic;
this.client = (MQTTSession) client;
this.brokerData = brokerData;
}
@Override
public void run() {
String rootTopic = MqttUtil.getRootTopic(mqttHeader.getTopicName());
EnodeService enodeService = this.defaultMqttMessageProcessor.getEnodeService();
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager();
Subscription subscription = iotClientManager.getSubscriptionByClientId(client.getClientId());
Subscription subscription = iotClientManager.getSubscriptionByClientId(this.client.getClientId());
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = subscription.getSubscriptionTable();
//compare current consumeOffset of rootTopic@clientId with maxOffset, pull message if consumeOffset < maxOffset
long maxOffsetInQueue;
try {
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
long nextOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0);
maxOffsetInQueue = getMaxOffset(this.brokerData.getBrokerName(), this.rootTopic);
long nextOffset = enodeService.queryOffset(this.brokerData.getBrokerName(), this.client.getClientId(), this.rootTopic, 0);
while (nextOffset <= maxOffsetInQueue) {
boolean inflightFullFlag = false;
//pull messages from enode above(brokerData.getBrokerName), 32 messages max.
PullMessageRequestHeader requestHeader = buildPullMessageRequestHeader(this.client.getClientId(), mqttHeader.getTopicName(), nextOffset, brokerData.getBrokerName());
PullMessageRequestHeader requestHeader = buildPullMessageRequestHeader(this.client.getClientId(), this.mqttHeader.getTopicName(), nextOffset, this.brokerData.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
RemotingCommand response = this.defaultMqttMessageProcessor.getEnodeService().pullMessageSync(null, brokerData.getBrokerName(), request);
RemotingCommand response = this.defaultMqttMessageProcessor.getEnodeService().pullMessageSync(null, this.brokerData.getBrokerName(), request);
PullResult pullResult = processPullResponse(response, subscriptionTable);
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset());
boolean alreadyInFlight = alreadyInFight(this.brokerData.getBrokerName(), realTopic, this.client.getClientId(), messageExt.getQueueOffset());
if (alreadyInFlight) {
log.info("The message is already inflight. MessageId={}", messageExt.getMsgId());
continue;
}
Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable);
mqttHeader.setQosLevel(pushQos);
mqttHeader.setTopicName(realTopic);
if (client.getInflightSlots().get() == 0) {
this.mqttHeader.setQosLevel(pushQos);
this.mqttHeader.setTopicName(realTopic);
if (this.client.getInflightSlots().get() == 0) {
log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic);
inflightFullFlag = true;
break;
......@@ -124,18 +124,18 @@ public class MqttPushTask implements Runnable {
}
}
private PullMessageRequestHeader buildPullMessageRequestHeader(String clientId, String topic, long offset,
private PullMessageRequestHeader buildPullMessageRequestHeader(String clientId, String rootTopic, long offset,
String enodeName) {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(clientId);
requestHeader.setTopic(MqttUtil.getRootTopic(topic));
requestHeader.setTopic(rootTopic);
requestHeader.setQueueId(0);
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(32);
requestHeader.setSysFlag(PullSysFlag.buildSysFlag(false, false, true, false));
requestHeader.setCommitOffset(0L);
// requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(topic);
requestHeader.setSubscription(rootTopic);
requestHeader.setSubVersion(0L);
requestHeader.setExpressionType(ExpressionType.TAG);
requestHeader.setEnodeName(enodeName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册