提交 686b5c79 编写于 作者: C chengxiangwang

implement part of logic of mqtt CONNECT request

上级 359e4432
......@@ -210,8 +210,12 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
e1);
}
startUpHouseKeepingService();
registerMessageHandler();
}
private void registerMessageHandler() {
}
@Override
public void shutdown() {
try {
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.snode;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
......@@ -50,6 +51,7 @@ import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.client.SubscriptionManager;
import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
......@@ -60,6 +62,16 @@ import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.snode.service.ClientService;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService;
......@@ -92,6 +104,7 @@ public class SnodeController {
private ScheduledService scheduledService;
private ClientManager producerManager;
private ClientManager consumerManager;
private ClientManager iotClientManager;
private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
......@@ -194,7 +207,8 @@ public class SnodeController {
this.subscriptionManager = new SubscriptionManagerImpl();
this.producerManager = new ProducerManagerImpl();
this.consumerManager = new ConsumerManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
this.iotClientManager = new IOTClientManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
}
......@@ -300,6 +314,16 @@ public class SnodeController {
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, new MqttDisconnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, new MqttPingreqMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, new MqttPublishMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, new MqttPubcompMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, new MqttSubscribeMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(this));
}
public void start() {
......@@ -444,6 +468,14 @@ public class SnodeController {
this.consumerManager = consumerManager;
}
public ClientManager getIotClientManager() {
return iotClientManager;
}
public void setIotClientManager(ClientManager iotClientManager) {
this.iotClientManager = iotClientManager;
}
public SubscriptionManager getSubscriptionManager() {
return subscriptionManager;
}
......
......@@ -39,6 +39,8 @@ public class Client {
private LanguageCode language;
private boolean isConnected;
public ClientRole getClientRole() {
return clientRole;
}
......@@ -59,11 +61,12 @@ public class Client {
Objects.equals(clientId, client.clientId) &&
Objects.equals(groups, client.groups) &&
Objects.equals(remotingChannel, client.remotingChannel) &&
language == client.language;
language == client.language &&
isConnected == client.isConnected();
}
@Override public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language, isConnected);
}
public RemotingChannel getRemotingChannel() {
......@@ -114,6 +117,14 @@ public class Client {
this.language = language;
}
public boolean isConnected() {
return isConnected;
}
public void setConnected(boolean connected) {
isConnected = connected;
}
public Set<String> getGroups() {
return groups;
}
......@@ -132,6 +143,7 @@ public class Client {
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
", isConnected=" + isConnected +
'}';
}
}
......
......@@ -32,21 +32,25 @@ public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ClientManager producerManager;
private final ClientManager consumerManager;
private final ClientManager iotClientManager;
public ClientHousekeepingService(final ClientManager producerManager,
final ClientManager consumerManager) {
final ClientManager consumerManager, final ClientManager iotClientManager) {
this.producerManager = producerManager;
this.consumerManager = consumerManager;
this.iotClientManager = iotClientManager;
}
public void start(long interval) {
this.producerManager.startScan(interval);
this.consumerManager.startScan(interval);
this.iotClientManager.startScan(interval);
}
public void shutdown() {
this.producerManager.shutdown();
this.consumerManager.shutdown();
this.iotClientManager.shutdown();
}
private ClientRole clientRole(RemotingChannel remotingChannel) {
......@@ -74,6 +78,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
case Producer:
this.producerManager.onClose(remoteAddress, remotingChannel);
return;
case IOTCLIENT:
this.iotClientManager.onClose(remoteAddress, remotingChannel);
return;
default:
}
}
......
......@@ -30,6 +30,8 @@ public interface ClientManager {
List<String> getAllClientId(String groupId);
Client getClient(String groupId, RemotingChannel remotingChannel);
void startScan(long interval);
void shutdown();
......
......@@ -35,12 +35,15 @@ import org.apache.rocketmq.snode.client.ClientManager;
public abstract class ClientManagerImpl implements ClientManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(
1024);
public abstract void onClosed(String group, RemotingChannel remotingChannel);
......@@ -72,7 +75,8 @@ public abstract class ClientManagerImpl implements ClientManager {
while (iterator.hasNext()) {
Map.Entry entry = (Map.Entry) iterator.next();
String group = (String) entry.getKey();
ConcurrentHashMap<RemotingChannel, Client> channelTable = (ConcurrentHashMap<RemotingChannel, Client>) entry.getValue();
ConcurrentHashMap<RemotingChannel, Client> channelTable = (ConcurrentHashMap<RemotingChannel, Client>) entry
.getValue();
Iterator iter = channelTable.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry channelTableEntry = (Map.Entry) iter.next();
......@@ -81,11 +85,15 @@ public abstract class ClientManagerImpl implements ClientManager {
if (interval > CHANNEL_EXPIRED_TIMEOUT) {
iter.remove();
client.getRemotingChannel().close();
log.warn("SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(client.getRemotingChannel().remoteAddress()), group);
log.warn(
"SCAN: Remove expired channel from {}ClientTable. channel={}, group={}",
client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(
client.getRemotingChannel().remoteAddress()), group);
if (channelTable.isEmpty()) {
iterator.remove();
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group, client.getClientRole());
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group,
client.getClientRole());
}
}
}
......@@ -107,22 +115,25 @@ public abstract class ClientManagerImpl implements ClientManager {
if (oldClient == null) {
Client prev = channelTable.put(client.getRemotingChannel(), client);
if (prev != null) {
log.info("New client connected, group: {} {} {} channel: {}", groupId, client.toString());
log.info("New client connected, group: {} {} {} channel: {}", groupId,
client.toString());
updated = true;
}
oldClient = client;
} else {
if (!oldClient.getClientId().equals(client.getClientId())) {
log.error("[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
log.error(
"[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
channelTable.put(client.getRemotingChannel(), client);
}
}
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
}
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId, client.getLastUpdateTimestamp());
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
client.getLastUpdateTimestamp());
onRegister(groupId, client.getRemotingChannel());
return updated;
}
......@@ -136,7 +147,8 @@ public abstract class ClientManagerImpl implements ClientManager {
}
if (channelTable.isEmpty()) {
groupClientTable.remove(groupId);
log.info("Unregister client ok, no any connection, and remove consumer group, {}", groupId);
log.info("Unregister client ok, no any connection, and remove consumer group, {}",
groupId);
}
}
}
......@@ -171,7 +183,8 @@ public abstract class ClientManagerImpl implements ClientManager {
List<String> result = new ArrayList<>();
Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId);
if (channelClientMap != null) {
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet().iterator();
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<RemotingChannel, Client> entry = it.next();
Client client = entry.getValue();
......@@ -180,4 +193,15 @@ public abstract class ClientManagerImpl implements ClientManager {
}
return result;
}
@Override
public Client getClient(String groupId, RemotingChannel remotingChannel) {
assert groupId != null && remotingChannel != null;
if (!groupClientTable.containsKey(groupId)) {
return null;
}
ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable
.get(groupId);
return channelClientMap.get(remotingChannel);
}
}
......@@ -17,9 +17,17 @@
package org.apache.rocketmq.snode.client.impl;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOTGROUP = "IOTGROUP";
private final SnodeController snodeController;
public IOTClientManagerImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public void onClosed(String group, RemotingChannel remotingChannel) {
......@@ -33,4 +41,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
@Override public void onRegister(String group, RemotingChannel remotingChannel) {
}
public SnodeController getSnodeController() {
return snodeController;
}
}
......@@ -17,32 +17,66 @@
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor {
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
@Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException {
//解析RemotingCommand,根据MqttMessageType做不同逻辑处理
//TODO
MqttHeader mqttHeader = (MqttHeader)message.decodeCommandCustomHeader(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
MqttFixedHeader fixedHeader = new MqttFixedHeader(mqttHeader.getMessageType(),
mqttHeader.isDup(), mqttHeader.getQosLevel(), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null;
switch (mqttHeader.getMessageType()) {
case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
MqttConnectPayload payload = decode(message.getBody(), MqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload);
case DISCONNECT:
}
return null;
return type2handler.get(mqttHeader.getMessageType()).handleMessage(mqttMessage, remotingChannel);
}
@Override
public boolean rejectRequest() {
return false;
}
private <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET));
return JSON.parseObject(json, classOfT);
}
public void registerMessageHanlder(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler);
}
}
......@@ -18,6 +18,8 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface MessageHandler {
......@@ -26,5 +28,5 @@ public interface MessageHandler {
*
* @param message
*/
void handleMessage(MqttMessage message);
RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel);
}
......@@ -22,9 +22,16 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
public class MqttConnectMessageHandler implements MessageHandler {
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
......@@ -34,10 +41,14 @@ public class MqttConnectMessageHandler implements MessageHandler {
this.clientManager = clientManager;
}*/
@Override public void handleMessage(MqttMessage message) {
public MqttConnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// MqttClient client = (MqttClient) message.getClient();
if (!(message instanceof MqttConnectMessage)) {
return;
return null;
}
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) message;
MqttConnectPayload payload = mqttConnectMessage.payload();
......@@ -45,8 +56,20 @@ public class MqttConnectMessageHandler implements MessageHandler {
MqttConnectReturnCode returnCode;
MqttConnAckMessage ackMessage;
if (isConnected(remotingChannel, mqttConnectMessage.payload().clientIdentifier())) {
}
// ChannelHandlerContext ctx = client.getCtx();
return null;
}
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true;
}
return false;
}
private boolean isServiceAviable(MqttConnectMessage connectMessage) {
......
......@@ -18,14 +18,22 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttDisconnectMessageHandler implements MessageHandler {
/* private ClientManager clientManager;
private final SnodeController snodeController;
public MqttDisconnectMessageHandler(ClientManager clientManager) {
this.clientManager = clientManager;
}*/
/* private ClientManager clientManager;
public MqttDisconnectMessageHandler(ClientManager clientManager) {
this.clientManager = clientManager;
}*/
public MqttDisconnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the DISCONNECT message from the client
......@@ -37,8 +45,8 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
* @param message
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// TODO discard the Will Message and Will Topic
return null;
}
}
......@@ -18,22 +18,30 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageForwarder implements MessageHandler {
private final SnodeController snodeController;
/* private SubscriptionStore subscriptionStore;
public MqttMessageForwarder(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
public MqttMessageForwarder(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle PUBLISH message from client
*
* @param message
* @return whether the message is handled successfully
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,16 +18,24 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageSender implements MessageHandler {
private final SnodeController snodeController;
public MqttMessageSender(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* send the PUBLISH message to client
*
* @param message
* @return whether the message is handled successfully
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,9 +18,17 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPingreqMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPingreqMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PINGREQ message from client
* <ol>
......@@ -33,6 +41,7 @@ public class MqttPingreqMessageHandler implements MessageHandler {
* @param message
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,9 +18,17 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubackMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubackMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBACK message from the client
* <ol>
......@@ -30,7 +38,7 @@ public class MqttPubackMessageHandler implements MessageHandler {
* @param
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,14 +18,23 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubcompMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubcompMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBCOMP message from the client
* @param message
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,6 +18,9 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPublishMessageHandler implements MessageHandler {
......@@ -26,9 +29,13 @@ public class MqttPublishMessageHandler implements MessageHandler {
public MqttPublishMessageHandler(MessageStore messageStore) {
this.messageStore = messageStore;
}*/
private final SnodeController snodeController;
@Override public void handleMessage(MqttMessage message) {
public MqttPublishMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,14 +18,23 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrecMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubrecMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBREC message from the clinet
* @param message
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,14 +18,24 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrelMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubrelMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBREL message from the client
* @param message
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,6 +18,9 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttSubscribeMessageHandler implements MessageHandler {
......@@ -26,7 +29,11 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private final SnodeController snodeController;
public MqttSubscribeMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the SUBSCRIBE message from the client
* <ol>
......@@ -41,7 +48,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
* @param message the message wrapping MqttSubscriptionMessage
* @return
*/
@Override public void handleMessage(MqttMessage message) {
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
......@@ -18,6 +18,9 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
/**
* handle the UNSUBSCRIBE message from the client
......@@ -35,9 +38,13 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private final SnodeController snodeController;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public void handleMessage(MqttMessage message) {
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册