diff --git a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java index 05c9360a84db33e8737f1052d8d10e9efbf5bacf..259ca95fc4faf892c5947ed54482f8634db2bce3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java +++ b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java @@ -47,7 +47,7 @@ public interface EnodeService { final RemotingCommand request); /** - * Pull message from enode server. + * Pull message from enode server asynchronously. * * @param enodeName Enode server name * @param request {@link PullMessageRequestHeader} Pull message request header @@ -56,6 +56,16 @@ public interface EnodeService { CompletableFuture pullMessage(final RemotingChannel remotingChannel, final String enodeName, final RemotingCommand request); + /** + * Pull message from enode server synchronously. + * + * @param enodeName Enode server name + * @param request {@link PullMessageRequestHeader} Pull message request header + * @return RemotingCommand + */ + RemotingCommand pullMessageSync(final RemotingChannel remotingChannel, final String enodeName, + final RemotingCommand request); + /** * Create retry topic in enode server. * diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java index 57982907c9012a175116274e768c57afc437a245..064bb778ae17586aabd930731d17c143e524c479 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java @@ -121,7 +121,7 @@ public class MqttConnectMessageHandler implements MessageHandler { { add("IOT_GROUP"); } - }, true, mqttConnectMessage.variableHeader().isCleanSession(), remotingChannel, System.currentTimeMillis()); + }, true, mqttConnectMessage.variableHeader().isCleanSession(), remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor); //register remotingChannel<--->client iotClientManager.register(IOTClientManagerImpl.IOT_GROUP, client); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java index 6fb715f1e96062b42c6c6855466ea50ad15cfca7..cb30709b3fd32fe349b53dd2d667c34d6c3ba14f 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java @@ -74,7 +74,7 @@ public class MqttMessageForwardHandler implements MessageHandler { mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later. Set clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()); for (Client client : clientsTobePublish) { - ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor); + ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body); } } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { TransferDataQos1 transferDataQos1 = TransferDataQos1.decode(body, TransferDataQos1.class); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java index be53e58a1235543fe64cd7d4c9d7f820c5c9511c..22c30a28f8c2eb68270160d4e30f227a97815ee1 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java @@ -111,7 +111,7 @@ public class MqttPublishMessageHandler implements MessageHandler { mqttHeaderQos0.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later. for (Client client : clientsTobePublish) { - ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor); + ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body); } //For clients that connected to other snodes, transfer the message to them @@ -219,6 +219,7 @@ public class MqttPublishMessageHandler implements MessageHandler { requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setBatch(false); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, variableHeader.topicName()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TAGS, MqttUtil.getRootTopic(variableHeader.topicName())); MessageAccessor.putProperty(msg, MqttConstant.PROPERTY_MQTT_QOS, fixedHeader.qosLevel().name()); requestHeader.setEnodeName(enodeName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 803648b2935585f23c0a4f4f7d5dcc5e3b307185..0ccb3545e2d1e5c5cea2af0385cbf54d6b64a18c 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -103,7 +103,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { this.orderedExecutor = OrderedExecutor.newBuilder().name("PushMessageToConsumerThreads").numThreads(mqttConfig.getPushMqttMessageMaxPoolSize()).build(); this.mqttScheduledService = new MqttScheduledServiceImpl(this); mqttScheduledService.startScheduleTask(); - + registerMessageHandler(MqttMessageType.CONNECT, new MqttConnectMessageHandler(this)); registerMessageHandler(MqttMessageType.DISCONNECT, diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java index d0870b61cb67bb685bf01c05c9a75a277277a827..40e74b59f7eba8043430fef2c05c27a052be60bb 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java @@ -17,23 +17,34 @@ package org.apache.rocketmq.mqtt.task; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.exception.MQSnodeException; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; @@ -76,49 +87,97 @@ public class MqttPushTask implements Runnable { long maxOffsetInQueue; try { maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); - final long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0); - long i = consumeOffset + 1; - while (i <= maxOffsetInQueue) { - //TODO query messages(queueOffset=i) from enode above(brokerData.getBrokerName) - RemotingCommand response = null; - ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); - MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); - - final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); - - boolean needSkip = needSkip(realTopic, subscriptionTable); - boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset()); - if (needSkip) { - log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic); - maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); - i += 1; - continue; - } - if (alreadyInFlight) { - log.info("The message is already inflight. MessageId={}", messageExt.getMsgId()); - break; + long nextOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0); + while (nextOffset <= maxOffsetInQueue) { + boolean inflightFullFlag = false; + //pull messages from enode above(brokerData.getBrokerName), 32 messages max. + PullMessageRequestHeader requestHeader = buildPullMessageRequestHeader(this.client.getClientId(), mqttHeader.getTopicName(), nextOffset, brokerData.getBrokerName()); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + RemotingCommand response = this.defaultMqttMessageProcessor.getEnodeService().pullMessageSync(null, brokerData.getBrokerName(), request); + PullResult pullResult = processPullResponse(response, subscriptionTable); + for (MessageExt messageExt : pullResult.getMsgFoundList()) { + final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); + boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset()); + if (alreadyInFlight) { + log.info("The message is already inflight. MessageId={}", messageExt.getMsgId()); + continue; + } + Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable); + mqttHeader.setQosLevel(pushQos); + mqttHeader.setTopicName(realTopic); + if (client.getInflightSlots().get() == 0) { + log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic); + inflightFullFlag = true; + break; + } + //push message if in-flight window has slot(not full) + client.pushMessageQos1(mqttHeader, messageExt, brokerData); } - Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable); - mqttHeader.setQosLevel(pushQos); - mqttHeader.setTopicName(realTopic); - if (client.getInflightSlots().get() == 0) { - log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic); + if (inflightFullFlag == true) { break; } - //push message if in-flight window has slot(not full) - client.pushMessageQos1(mqttHeader, messageExt, brokerData); - maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); - i += 1; + nextOffset = pullResult.getNextBeginOffset(); } - - //TODO update consumeOffset of rootTopic@clientId in brokerData.getBrokerName() - enodeService.persistOffset(null, brokerData.getBrokerName(), client.getClientId(), rootTopic, 0, i - 1); } catch (Exception ex) { log.error("Exception was thrown when pushing messages to consumer.{}", ex); } } + private PullMessageRequestHeader buildPullMessageRequestHeader(String clientId, String topic, long offset, + String enodeName) { + PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); + requestHeader.setConsumerGroup(clientId); + requestHeader.setTopic(MqttUtil.getRootTopic(topic)); + requestHeader.setQueueId(0); + requestHeader.setQueueOffset(offset); + requestHeader.setMaxMsgNums(32); + requestHeader.setSysFlag(PullSysFlag.buildSysFlag(false, false, true, false)); + requestHeader.setCommitOffset(0L); +// requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); + requestHeader.setSubscription(topic); + requestHeader.setSubVersion(0L); + requestHeader.setExpressionType(ExpressionType.TAG); + requestHeader.setEnodeName(enodeName); + return requestHeader; + } + + private PullResult processPullResponse(final RemotingCommand response, + ConcurrentHashMap subscriptionTable) throws MQSnodeException, RemotingCommandException { + PullStatus pullStatus; + switch (response.getCode()) { + case ResponseCode.SUCCESS: + pullStatus = PullStatus.FOUND; + break; + case ResponseCode.PULL_NOT_FOUND: + pullStatus = PullStatus.NO_NEW_MSG; + break; + case ResponseCode.PULL_RETRY_IMMEDIATELY: + pullStatus = PullStatus.NO_MATCHED_MSG; + break; + case ResponseCode.PULL_OFFSET_MOVED: + pullStatus = PullStatus.OFFSET_ILLEGAL; + break; + + default: + throw new MQSnodeException(response.getCode(), response.getRemark()); + } + + PullMessageResponseHeader responseHeader = + (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); + List msgList = MessageDecoder.decodes(byteBuffer); + //filter messages again + List msgListFilterAgain = new ArrayList<>(msgList.size()); + for (MessageExt msg : msgList) { + if (msg.getTags() != null && !needSkip(msg.getTags(), subscriptionTable)) { + msgListFilterAgain.add(msg); + } + } + return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), + responseHeader.getMaxOffset(), msgListFilterAgain, responseHeader.getSuggestWhichBrokerId(), response.getBody()); + } + private boolean needSkip(final String realTopic, ConcurrentHashMap subscriptionTable) { Enumeration topicFilters = subscriptionTable.keys(); while (topicFilters.hasMoreElements()) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java index d171e4ec2612d7c415e6190319fa956def8ccf82..66ee7e25c860e94fa76884e96be19b76dde7391f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java @@ -22,9 +22,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.rocketmq.common.exception.MQBrokerException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.exception.MQBrokerException; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -34,6 +34,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.constant.SnodeConstant; -import org.apache.rocketmq.common.service.EnodeService; public class RemoteEnodeServiceImpl implements EnodeService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); @@ -101,12 +101,24 @@ public class RemoteEnodeServiceImpl implements EnodeService { } }); } catch (Exception ex) { - log.error("Pull message async error:", ex); + log.error("Pull message async error: {}", ex); future.completeExceptionally(ex); } return future; } + @Override public RemotingCommand pullMessageSync(RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { + try { + String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); + RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress, request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND); + return response; + } catch (Exception ex) { + log.error("Pull message sync error: {}", ex); + } + return null; + } + @Override public CompletableFuture sendMessage(final RemotingChannel remotingChannel, String enodeName, RemotingCommand request) {