提交 947d558a 编写于 作者: C chengxiangwang

fulfill logic of pull messages from enode

上级 b1a0b485
...@@ -47,7 +47,7 @@ public interface EnodeService { ...@@ -47,7 +47,7 @@ public interface EnodeService {
final RemotingCommand request); final RemotingCommand request);
/** /**
* Pull message from enode server. * Pull message from enode server asynchronously.
* *
* @param enodeName Enode server name * @param enodeName Enode server name
* @param request {@link PullMessageRequestHeader} Pull message request header * @param request {@link PullMessageRequestHeader} Pull message request header
...@@ -56,6 +56,16 @@ public interface EnodeService { ...@@ -56,6 +56,16 @@ public interface EnodeService {
CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel remotingChannel, final String enodeName, CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel remotingChannel, final String enodeName,
final RemotingCommand request); final RemotingCommand request);
/**
* Pull message from enode server synchronously.
*
* @param enodeName Enode server name
* @param request {@link PullMessageRequestHeader} Pull message request header
* @return RemotingCommand
*/
RemotingCommand pullMessageSync(final RemotingChannel remotingChannel, final String enodeName,
final RemotingCommand request);
/** /**
* Create retry topic in enode server. * Create retry topic in enode server.
* *
......
...@@ -121,7 +121,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -121,7 +121,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
{ {
add("IOT_GROUP"); add("IOT_GROUP");
} }
}, true, mqttConnectMessage.variableHeader().isCleanSession(), remotingChannel, System.currentTimeMillis()); }, true, mqttConnectMessage.variableHeader().isCleanSession(), remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor);
//register remotingChannel<--->client //register remotingChannel<--->client
iotClientManager.register(IOTClientManagerImpl.IOT_GROUP, client); iotClientManager.register(IOTClientManagerImpl.IOT_GROUP, client);
......
...@@ -74,7 +74,7 @@ public class MqttMessageForwardHandler implements MessageHandler { ...@@ -74,7 +74,7 @@ public class MqttMessageForwardHandler implements MessageHandler {
mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later. mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later.
Set<Client> clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()); Set<Client> clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager());
for (Client client : clientsTobePublish) { for (Client client : clientsTobePublish) {
((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor); ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body);
} }
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
TransferDataQos1 transferDataQos1 = TransferDataQos1.decode(body, TransferDataQos1.class); TransferDataQos1 transferDataQos1 = TransferDataQos1.decode(body, TransferDataQos1.class);
......
...@@ -111,7 +111,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -111,7 +111,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
mqttHeaderQos0.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeaderQos0.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later. mqttHeaderQos0.setRetain(false); //TODO set to false temporarily, need to be implemented later.
for (Client client : clientsTobePublish) { for (Client client : clientsTobePublish) {
((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body, this.defaultMqttMessageProcessor); ((MQTTSession) client).pushMessageQos0(mqttHeaderQos0, body);
} }
//For clients that connected to other snodes, transfer the message to them //For clients that connected to other snodes, transfer the message to them
...@@ -219,6 +219,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -219,6 +219,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setBatch(false); requestHeader.setBatch(false);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, variableHeader.topicName()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, variableHeader.topicName());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TAGS, MqttUtil.getRootTopic(variableHeader.topicName()));
MessageAccessor.putProperty(msg, MqttConstant.PROPERTY_MQTT_QOS, fixedHeader.qosLevel().name()); MessageAccessor.putProperty(msg, MqttConstant.PROPERTY_MQTT_QOS, fixedHeader.qosLevel().name());
requestHeader.setEnodeName(enodeName); requestHeader.setEnodeName(enodeName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
......
...@@ -17,23 +17,34 @@ ...@@ -17,23 +17,34 @@
package org.apache.rocketmq.mqtt.task; package org.apache.rocketmq.mqtt.task;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.exception.MQSnodeException;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
...@@ -76,49 +87,97 @@ public class MqttPushTask implements Runnable { ...@@ -76,49 +87,97 @@ public class MqttPushTask implements Runnable {
long maxOffsetInQueue; long maxOffsetInQueue;
try { try {
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
final long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0); long nextOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0);
long i = consumeOffset + 1; while (nextOffset <= maxOffsetInQueue) {
while (i <= maxOffsetInQueue) { boolean inflightFullFlag = false;
//TODO query messages(queueOffset=i) from enode above(brokerData.getBrokerName) //pull messages from enode above(brokerData.getBrokerName), 32 messages max.
RemotingCommand response = null; PullMessageRequestHeader requestHeader = buildPullMessageRequestHeader(this.client.getClientId(), mqttHeader.getTopicName(), nextOffset, brokerData.getBrokerName());
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); RemotingCommand response = this.defaultMqttMessageProcessor.getEnodeService().pullMessageSync(null, brokerData.getBrokerName(), request);
PullResult pullResult = processPullResponse(response, subscriptionTable);
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
boolean needSkip = needSkip(realTopic, subscriptionTable);
boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset()); boolean alreadyInFlight = alreadyInFight(brokerData.getBrokerName(), realTopic, client.getClientId(), messageExt.getQueueOffset());
if (needSkip) {
log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic);
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1;
continue;
}
if (alreadyInFlight) { if (alreadyInFlight) {
log.info("The message is already inflight. MessageId={}", messageExt.getMsgId()); log.info("The message is already inflight. MessageId={}", messageExt.getMsgId());
break; continue;
} }
Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable); Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)), subscriptionTable);
mqttHeader.setQosLevel(pushQos); mqttHeader.setQosLevel(pushQos);
mqttHeader.setTopicName(realTopic); mqttHeader.setTopicName(realTopic);
if (client.getInflightSlots().get() == 0) { if (client.getInflightSlots().get() == 0) {
log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic); log.info("The in-flight window is full, stop pushing message to consumers and update consumeOffset. ClientId={}, rootTopic={}", client.getClientId(), rootTopic);
inflightFullFlag = true;
break; break;
} }
//push message if in-flight window has slot(not full) //push message if in-flight window has slot(not full)
client.pushMessageQos1(mqttHeader, messageExt, brokerData); client.pushMessageQos1(mqttHeader, messageExt, brokerData);
}
if (inflightFullFlag == true) {
break;
}
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
i += 1; nextOffset = pullResult.getNextBeginOffset();
} }
//TODO update consumeOffset of rootTopic@clientId in brokerData.getBrokerName()
enodeService.persistOffset(null, brokerData.getBrokerName(), client.getClientId(), rootTopic, 0, i - 1);
} catch (Exception ex) { } catch (Exception ex) {
log.error("Exception was thrown when pushing messages to consumer.{}", ex); log.error("Exception was thrown when pushing messages to consumer.{}", ex);
} }
} }
private PullMessageRequestHeader buildPullMessageRequestHeader(String clientId, String topic, long offset,
String enodeName) {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(clientId);
requestHeader.setTopic(MqttUtil.getRootTopic(topic));
requestHeader.setQueueId(0);
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(32);
requestHeader.setSysFlag(PullSysFlag.buildSysFlag(false, false, true, false));
requestHeader.setCommitOffset(0L);
// requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(topic);
requestHeader.setSubVersion(0L);
requestHeader.setExpressionType(ExpressionType.TAG);
requestHeader.setEnodeName(enodeName);
return requestHeader;
}
private PullResult processPullResponse(final RemotingCommand response,
ConcurrentHashMap<String, SubscriptionData> subscriptionTable) throws MQSnodeException, RemotingCommandException {
PullStatus pullStatus;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQSnodeException(response.getCode(), response.getRemark());
}
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
//filter messages again
List<MessageExt> msgListFilterAgain = new ArrayList<>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null && !needSkip(msg.getTags(), subscriptionTable)) {
msgListFilterAgain.add(msg);
}
}
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), msgListFilterAgain, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
private boolean needSkip(final String realTopic, ConcurrentHashMap<String, SubscriptionData> subscriptionTable) { private boolean needSkip(final String realTopic, ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
Enumeration<String> topicFilters = subscriptionTable.keys(); Enumeration<String> topicFilters = subscriptionTable.keys();
while (topicFilters.hasMoreElements()) { while (topicFilters.hasMoreElements()) {
......
...@@ -22,9 +22,9 @@ import java.util.Set; ...@@ -22,9 +22,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
...@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead ...@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.common.service.EnodeService;
public class RemoteEnodeServiceImpl implements EnodeService { public class RemoteEnodeServiceImpl implements EnodeService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
...@@ -101,12 +101,24 @@ public class RemoteEnodeServiceImpl implements EnodeService { ...@@ -101,12 +101,24 @@ public class RemoteEnodeServiceImpl implements EnodeService {
} }
}); });
} catch (Exception ex) { } catch (Exception ex) {
log.error("Pull message async error:", ex); log.error("Pull message async error: {}", ex);
future.completeExceptionally(ex); future.completeExceptionally(ex);
} }
return future; return future;
} }
@Override public RemotingCommand pullMessageSync(RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
try {
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress, request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND);
return response;
} catch (Exception ex) {
log.error("Pull message sync error: {}", ex);
}
return null;
}
@Override @Override
public CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel remotingChannel, String enodeName, public CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) { RemotingCommand request) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册