提交 3629a0a7 编写于 作者: C chengxiangwang

add new module rocketmq-mqtt

上级 a1f4358e
...@@ -50,6 +50,8 @@ public class MqttConfig { ...@@ -50,6 +50,8 @@ public class MqttConfig {
@ImportantField @ImportantField
private boolean aclEnable = false; private boolean aclEnable = false;
private long houseKeepingInterval = 10 * 1000;
public int getListenPort() { public int getListenPort() {
return listenPort; return listenPort;
} }
...@@ -130,4 +132,12 @@ public class MqttConfig { ...@@ -130,4 +132,12 @@ public class MqttConfig {
public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) { public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) {
this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity; this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity;
} }
public long getHouseKeepingInterval() {
return houseKeepingInterval;
}
public void setHouseKeepingInterval(long houseKeepingInterval) {
this.houseKeepingInterval = houseKeepingInterval;
}
} }
...@@ -14,13 +14,12 @@ ...@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.client; package org.apache.rocketmq.common.client;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.snode.client.impl.ClientRole;
public class Client { public class Client {
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.client; package org.apache.rocketmq.common.client;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.common.client;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
...@@ -33,8 +33,6 @@ import org.apache.rocketmq.remoting.RemotingChannel; ...@@ -33,8 +33,6 @@ import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
public abstract class ClientManagerImpl implements ClientManager { public abstract class ClientManagerImpl implements ClientManager {
...@@ -45,7 +43,7 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -45,7 +43,7 @@ public abstract class ClientManagerImpl implements ClientManager {
.newSingleThreadScheduledExecutor( .newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>( private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<String, ConcurrentHashMap<RemotingChannel, Client>>(
1024); 1024);
public abstract void onClosed(String group, RemotingChannel remotingChannel); public abstract void onClosed(String group, RemotingChannel remotingChannel);
...@@ -176,7 +174,7 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -176,7 +174,7 @@ public abstract class ClientManagerImpl implements ClientManager {
public List<RemotingChannel> getChannels(String groupId) { public List<RemotingChannel> getChannels(String groupId) {
if (groupId != null) { if (groupId != null) {
List<RemotingChannel> result = new ArrayList<>(); List<RemotingChannel> result = new ArrayList<RemotingChannel>();
ConcurrentHashMap channelsMap = this.groupClientTable.get(groupId); ConcurrentHashMap channelsMap = this.groupClientTable.get(groupId);
if (channelsMap != null) { if (channelsMap != null) {
result.addAll(this.groupClientTable.get(groupId).keySet()); result.addAll(this.groupClientTable.get(groupId).keySet());
...@@ -189,7 +187,7 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -189,7 +187,7 @@ public abstract class ClientManagerImpl implements ClientManager {
@Override @Override
public List<String> getAllClientId(String groupId) { public List<String> getAllClientId(String groupId) {
List<String> result = new ArrayList<>(); List<String> result = new ArrayList<String>();
Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId); Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId);
if (channelClientMap != null) { if (channelClientMap != null) {
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet() Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet()
......
package org.apache.rocketmq.snode.client.impl;/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.client;/*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.common.client;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
...@@ -27,7 +27,7 @@ public class Subscription { ...@@ -27,7 +27,7 @@ public class Subscription {
private volatile MessageModel messageModel; private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere; private volatile ConsumeFromWhere consumeFromWhere;
private volatile boolean cleanSession; private volatile boolean cleanSession;
ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = new ConcurrentHashMap<>(); ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
private volatile long lastUpdateTimestamp = System.currentTimeMillis(); private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public SubscriptionData getSubscriptionData(String topic) { public SubscriptionData getSubscriptionData(String topic) {
......
...@@ -38,4 +38,5 @@ public class LoggerName { ...@@ -38,4 +38,5 @@ public class LoggerName {
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
public static final String SNODE_LOGGER_NAME = "RocketmqSnode"; public static final String SNODE_LOGGER_NAME = "RocketmqSnode";
public static final String MQTT_LOGGER_NAME = "RocketmqMQTT";
} }
...@@ -15,4 +15,4 @@ ...@@ -15,4 +15,4 @@
namesrvAddr=localhost:9876 namesrvAddr=localhost:9876
clusterName = DefaultCluster clusterName = DefaultCluster
snodeName = snode-a snodeName = snode-a
embeddedModeEnable = false
...@@ -67,6 +67,7 @@ ...@@ -67,6 +67,7 @@
<include>org.apache.rocketmq:rocketmq-namesrv</include> <include>org.apache.rocketmq:rocketmq-namesrv</include>
<include>org.apache.rocketmq:rocketmq-example</include> <include>org.apache.rocketmq:rocketmq-example</include>
<include>org.apache.rocketmq:rocketmq-openmessaging</include> <include>org.apache.rocketmq:rocketmq-openmessaging</include>
<include>org.apache.rocketmq:rocketmq-mqtt</include>
<include>org.apache.rocketmq:rocketmq-snode</include> <include>org.apache.rocketmq:rocketmq-snode</include>
</includes> </includes>
<binaries> <binaries>
......
...@@ -50,17 +50,17 @@ public class MqttSampleConsumer { ...@@ -50,17 +50,17 @@ public class MqttSampleConsumer {
log.info("Connected"); log.info("Connected");
sampleClient.setCallback(new MqttCallback() { sampleClient.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) { @Override public void connectionLost(Throwable throwable) {
System.out.println("connection lost." + throwable.getLocalizedMessage()); log.info("connection lost." + throwable.getLocalizedMessage());
} }
@Override public void messageArrived(String s, MqttMessage message) throws Exception { @Override public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println(message.toString()); log.info(message.toString());
// System.exit(0); // System.exit(0);
} }
@Override public void deliveryComplete(IMqttDeliveryToken token) { @Override public void deliveryComplete(IMqttDeliveryToken token) {
try { try {
System.out.println("delivery complete." + token.getMessage().toString()); log.info("delivery complete." + token.getMessage().toString());
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-mqtt</artifactId>
<name>rocketmq-mqtt ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-store</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId>
</dependency>
</dependencies>
</project>
...@@ -14,34 +14,33 @@ ...@@ -14,34 +14,33 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.mqtt.client;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManagerImpl;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
public class IOTClientManagerImpl extends ClientManagerImpl { public class IOTClientManagerImpl extends ClientManagerImpl {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
public static final String IOT_GROUP = "IOT_GROUP"; public static final String IOT_GROUP = "IOT_GROUP";
private final SnodeController snodeController;
private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>( private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
1024); 1024);
private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024); private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
public IOTClientManagerImpl(SnodeController snodeController) { public IOTClientManagerImpl() {
this.snodeController = snodeController;
} }
public void onUnsubscribe(Client client, List<String> topics) { public void onUnsubscribe(Client client, List<String> topics) {
...@@ -89,6 +88,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -89,6 +88,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
} }
iterator1.remove(); iterator1.remove();
} }
if (next.getValue() == null || next.getValue().size() == 0) {
iterator.remove();
}
} }
//remove offline messages //remove offline messages
} }
...@@ -97,10 +99,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -97,10 +99,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
return clientId2Subscription.get(clientId); return clientId2Subscription.get(clientId);
} }
public SnodeController getSnodeController() {
return snodeController;
}
public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() { public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() {
return topic2SubscriptionTable; return topic2SubscriptionTable;
} }
...@@ -112,5 +110,4 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -112,5 +110,4 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void initSubscription(String clientId, Subscription subscription) { public void initSubscription(String clientId, Subscription subscription) {
clientId2Subscription.put(clientId, subscription); clientId2Subscription.put(clientId, subscription);
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.client;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
public class MqttClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final ClientManager iotClientManager;
public MqttClientHousekeepingService(final ClientManager iotClientManager) {
this.iotClientManager = iotClientManager;
}
public void start(long interval) {
this.iotClientManager.startScan(interval);
}
public void shutdown() {
this.iotClientManager.shutdown();
}
private Client getClient(RemotingChannel remotingChannel) {
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
Attribute<Client> clientAttribute = channel.attr(MqttConstant.MQTT_CLIENT_ATTRIBUTE_KEY);
if (clientAttribute != null) {
Client client = clientAttribute.get();
return client;
}
}
log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
return null;
}
private void closeChannel(String remoteAddress, RemotingChannel remotingChannel) {
Client client = getClient(remotingChannel);
if (client != null) {
switch (client.getClientRole()) {
case IOTCLIENT:
this.iotClientManager.onClose(client.getGroups(), remotingChannel);
return;
default:
}
}
log.warn("Close channel without any role");
}
@Override
public void onChannelConnect(String remoteAddr, RemotingChannel channel) {
log.info("Remoting channel connected: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress()));
}
@Override
public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
closeChannel(remoteAddr, remotingChannel);
}
@Override
public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
closeChannel(remoteAddr, remotingChannel);
}
@Override
public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
closeChannel(remoteAddr, remotingChannel);
}
}
...@@ -15,11 +15,16 @@ ...@@ -15,11 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.constant; package org.apache.rocketmq.mqtt.constant;
import io.netty.util.AttributeKey;
import org.apache.rocketmq.common.client.Client;
public class MqttConstant { public class MqttConstant {
public static final int MAX_SUPPORTED_QOS = 0; public static final int MAX_SUPPORTED_QOS = 0;
public static final String SUBSCRIPTION_FLAG_PLUS = "+"; public static final String SUBSCRIPTION_FLAG_PLUS = "+";
public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_FLAG_SHARP = "#";
public static final String SUBSCRIPTION_SEPARATOR = "/"; public static final String SUBSCRIPTION_SEPARATOR = "/";
public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.exception; package org.apache.rocketmq.mqtt.exception;
public class MqttConnectException extends RuntimeException { public class MqttConnectException extends RuntimeException {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.exception; package org.apache.rocketmq.mqtt.exception;
public class WrongMessageTypeException extends RuntimeException { public class WrongMessageTypeException extends RuntimeException {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectPayload;
...@@ -25,32 +25,32 @@ import io.netty.handler.codec.mqtt.MqttMessageType; ...@@ -25,32 +25,32 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.HashSet; import java.util.HashSet;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.exception.MqttConnectException;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.exception.MqttConnectException;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
public class MqttConnectMessageHandler implements MessageHandler { public class MqttConnectMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3; private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4; private static final int MAX_AVAILABLE_VERSION = 4;
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttConnectMessageHandler(SnodeController snodeController) { public MqttConnectMessageHandler(DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
this.snodeController = snodeController; this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
} }
@Override @Override
...@@ -97,7 +97,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -97,7 +97,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
remotingChannel.close(); remotingChannel.close();
return null; return null;
} }
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
//set Session Present according to whether the server has already stored Session State for the clientId //set Session Present according to whether the server has already stored Session State for the clientId
if (mqttConnectMessage.variableHeader().isCleanSession()) { if (mqttConnectMessage.variableHeader().isCleanSession()) {
mqttHeader.setSessionPresent(false); mqttHeader.setSessionPresent(false);
...@@ -120,7 +120,11 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -120,7 +120,11 @@ public class MqttConnectMessageHandler implements MessageHandler {
Client client = new Client(); Client client = new Client();
client.setClientId(payload.clientIdentifier()); client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT); client.setClientRole(ClientRole.IOTCLIENT);
client.setGroups(new HashSet<String>(){{add("IOT_GROUP");}}); client.setGroups(new HashSet<String>() {
{
add("IOT_GROUP");
}
});
client.setConnected(true); client.setConnected(true);
client.setRemotingChannel(remotingChannel); client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis()); client.setLastUpdateTimestamp(System.currentTimeMillis());
...@@ -138,7 +142,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -138,7 +142,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
willMessage.setWillTopic(payload.willTopic()); willMessage.setWillTopic(payload.willTopic());
willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain()); willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain());
willMessage.setBody(payload.willMessageInBytes()); willMessage.setBody(payload.willMessageInBytes());
snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); defaultMqttMessageProcessor.getWillMessageService().saveWillMessage(client.getClientId(), willMessage);
} }
mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name()); mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name());
...@@ -148,7 +152,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -148,7 +152,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
} }
private boolean alreadyStoredSession(String clientId) { private boolean alreadyStoredSession(String clientId) {
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId); Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId);
if (subscription == null) { if (subscription == null) {
return false; return false;
...@@ -164,7 +168,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -164,7 +168,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
} }
private boolean isConnected(RemotingChannel remotingChannel, String clientId) { private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = snodeController.getIotClientManager(); ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true; return true;
......
...@@ -15,27 +15,27 @@ ...@@ -15,27 +15,27 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
public class MqttDisconnectMessageHandler implements MessageHandler { public class MqttDisconnectMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); public MqttDisconnectMessageHandler(DefaultMqttMessageProcessor processor) {
private final SnodeController snodeController; this.defaultMqttMessageProcessor = processor;
public MqttDisconnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
...@@ -56,10 +56,10 @@ public class MqttDisconnectMessageHandler implements MessageHandler { ...@@ -56,10 +56,10 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
} }
//discard will message associated with the current connection(client) //discard will message associated with the current connection(client)
Client client = snodeController.getIotClientManager() Client client = defaultMqttMessageProcessor.getIotClientManager()
.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null) { if (client != null) {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId());
} }
client.setConnected(false); client.setConnected(false);
if (remotingChannel.isActive()) { if (remotingChannel.isActive()) {
......
...@@ -15,24 +15,23 @@ ...@@ -15,24 +15,23 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageForwarder implements MessageHandler { public class MqttMessageForwarder implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttMessageForwarder(DefaultMqttMessageProcessor processor) {
/* private SubscriptionStore subscriptionStore; this.defaultMqttMessageProcessor = processor;
public MqttMessageForwarder(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
public MqttMessageForwarder(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
......
...@@ -15,20 +15,25 @@ ...@@ -15,20 +15,25 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageSender implements MessageHandler { public class MqttMessageSender implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttMessageSender(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
public MqttMessageSender(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
* send the PUBLISH message to client * send the PUBLISH message to client
* *
......
...@@ -15,29 +15,28 @@ ...@@ -15,29 +15,28 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPingreqMessageHandler implements MessageHandler { public class MqttPingreqMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttPingreqMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
public MqttPingreqMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
* handle the PINGREQ message from client * handle the PINGREQ message from client <ol> <li>check client exists</li> <li>check client is connected</li>
* <ol> * <li>generate the PINGRESP message</li> <li>send the PINGRESP message to the client</li> </ol>
* <li>check client exists</li>
* <li>check client is connected</li>
* <li>generate the PINGRESP message</li>
* <li>send the PINGRESP message to the client</li>
* </ol>
* *
* @param message * @param message
* @return * @return
......
...@@ -15,26 +15,30 @@ ...@@ -15,26 +15,30 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubackMessageHandler implements MessageHandler { public class MqttPubackMessageHandler implements MessageHandler {
private final SnodeController snodeController; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttPubackMessageHandler(SnodeController snodeController) { public MqttPubackMessageHandler(DefaultMqttMessageProcessor processor) {
this.snodeController = snodeController; this.defaultMqttMessageProcessor = processor;
} }
/** /**
* handle the PUBACK message from the client * handle the PUBACK message from the client <ol> <li>remove the message from the published in-flight messages</li>
* <ol> * <li>ack the message in the MessageStore</li> </ol>
* <li>remove the message from the published in-flight messages</li> *
* <li>ack the message in the MessageStore</li>
* </ol>
* @param * @param
* @return * @return
*/ */
......
...@@ -15,22 +15,28 @@ ...@@ -15,22 +15,28 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubcompMessageHandler implements MessageHandler { public class MqttPubcompMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttPubcompMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
public MqttPubcompMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
* handle the PUBCOMP message from the client * handle the PUBCOMP message from the client
*
* @param message * @param message
* @return * @return
*/ */
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
...@@ -27,21 +27,20 @@ import io.netty.handler.codec.mqtt.MqttQoS; ...@@ -27,21 +27,20 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException; import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPublishMessageHandler implements MessageHandler { public class MqttPublishMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
@Override @Override
...@@ -61,7 +60,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -61,7 +60,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
ByteBuf payload = mqttPublishMessage.payload(); ByteBuf payload = mqttPublishMessage.payload();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
//Push messages to subscribers and add it to IN-FLIGHT messages //Push messages to subscribers and add it to IN-FLIGHT messages
} }
......
...@@ -15,22 +15,28 @@ ...@@ -15,22 +15,28 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrecMessageHandler implements MessageHandler { public class MqttPubrecMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttPubrecMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
public MqttPubrecMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
* handle the PUBREC message from the clinet * handle the PUBREC message from the clinet
*
* @param message * @param message
* @return * @return
*/ */
......
...@@ -15,23 +15,28 @@ ...@@ -15,23 +15,28 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrelMessageHandler implements MessageHandler { public class MqttPubrelMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private final SnodeController snodeController; public MqttPubrelMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
public MqttPubrelMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
/** /**
* handle the PUBREL message from the client * handle the PUBREL message from the client
*
* @param message * @param message
* @return * @return
*/ */
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
...@@ -29,31 +29,32 @@ import java.util.HashSet; ...@@ -29,31 +29,32 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.constant.MqttConstant;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttSubscribeMessageHandler implements MessageHandler { public class MqttSubscribeMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final SnodeController snodeController; private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttSubscribeMessageHandler(SnodeController snodeController) { public MqttSubscribeMessageHandler(DefaultMqttMessageProcessor processor) {
this.snodeController = snodeController; this.defaultMqttMessageProcessor = processor;
} }
/** /**
...@@ -72,7 +73,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -72,7 +73,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
} }
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message; MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
MqttSubscribePayload payload = mqttSubscribeMessage.payload(); MqttSubscribePayload payload = mqttSubscribeMessage.payload();
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client == null) { if (client == null) {
log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
...@@ -135,7 +136,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -135,7 +136,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
//2.update topic2SubscriptionTable //2.update topic2SubscriptionTable
String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName()); String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
if (client2SubscriptionData == null) { if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) {
client2SubscriptionData = new ConcurrentHashMap<>(); client2SubscriptionData = new ConcurrentHashMap<>();
ConcurrentHashMap<Client, Set<SubscriptionData>> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData); ConcurrentHashMap<Client, Set<SubscriptionData>> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
if (prev != null) { if (prev != null) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
...@@ -27,31 +27,31 @@ import java.util.Iterator; ...@@ -27,31 +27,31 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
/** /**
* handle the UNSUBSCRIBE message from the client * handle the UNSUBSCRIBE message from the client
*/ */
public class MqttUnsubscribeMessagHandler implements MessageHandler { public class MqttUnsubscribeMessagHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); public MqttUnsubscribeMessagHandler(DefaultMqttMessageProcessor processor) {
private final SnodeController snodeController; this.defaultMqttMessageProcessor = processor;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController;
} }
@Override @Override
...@@ -73,7 +73,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { ...@@ -73,7 +73,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
remotingChannel.close(); remotingChannel.close();
return null; return null;
} }
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client == null) { if (client == null) {
log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
...@@ -98,7 +98,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { ...@@ -98,7 +98,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
} }
private void doUnsubscribe(Client client, List<String> topics, IOTClientManagerImpl iotClientManager) { private void doUnsubscribe(Client client, List<String> topics, IOTClientManagerImpl iotClientManager) {
ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription(); ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
for (String topicFilter : topics) { for (String topicFilter : topics) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.mqtt.processor;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
...@@ -33,55 +33,73 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload; ...@@ -33,55 +33,73 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MqttClientHousekeepingService;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubcompMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor { public class DefaultMqttMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>(); private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3; private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4; private static final int MAX_AVAILABLE_VERSION = 4;
private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService;
private ClientManager iotClientManager;
private RemotingServer mqttRemotingServer;
private MqttClientHousekeepingService mqttClientHousekeepingService;
private MqttConfig mqttConfig;
public DefaultMqttMessageProcessor(MqttConfig mqttConfig, RemotingServer mqttRemotingServer) {
this.willMessageService = new WillMessageServiceImpl();
this.mqttPushService = new MqttPushServiceImpl(this, mqttConfig);
this.iotClientManager = new IOTClientManagerImpl();
this.mqttRemotingServer = mqttRemotingServer;
this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager);
this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval());
public DefaultMqttMessageProcessor(SnodeController snodeController) {
this.snodeController = snodeController;
registerMessageHandler(MqttMessageType.CONNECT, registerMessageHandler(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this.snodeController)); new MqttConnectMessageHandler(this));
registerMessageHandler(MqttMessageType.DISCONNECT, registerMessageHandler(MqttMessageType.DISCONNECT,
new MqttDisconnectMessageHandler(this.snodeController)); new MqttDisconnectMessageHandler(this));
registerMessageHandler(MqttMessageType.PINGREQ, registerMessageHandler(MqttMessageType.PINGREQ,
new MqttPingreqMessageHandler(this.snodeController)); new MqttPingreqMessageHandler(this));
registerMessageHandler(MqttMessageType.PUBLISH, registerMessageHandler(MqttMessageType.PUBLISH,
new MqttPublishMessageHandler(this.snodeController)); new MqttPublishMessageHandler(this));
registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
registerMessageHandler(MqttMessageType.PUBCOMP, registerMessageHandler(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this.snodeController)); new MqttPubcompMessageHandler(this));
registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
registerMessageHandler(MqttMessageType.SUBSCRIBE, registerMessageHandler(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this.snodeController)); new MqttSubscribeMessageHandler(this));
registerMessageHandler(MqttMessageType.UNSUBSCRIBE, registerMessageHandler(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this.snodeController)); new MqttUnsubscribeMessagHandler(this));
} }
@Override @Override
...@@ -127,4 +145,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { ...@@ -127,4 +145,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler); type2handler.put(type, handler);
} }
public WillMessageService getWillMessageService() {
return willMessageService;
}
public MqttPushServiceImpl getMqttPushService() {
return mqttPushService;
}
public ClientManager getIotClientManager() {
return iotClientManager;
}
public MqttConfig getMqttConfig() {
return mqttConfig;
}
public RemotingServer getMqttRemotingServer() {
return mqttRemotingServer;
}
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.mqtt.service;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.mqtt.service.impl;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
...@@ -27,37 +27,38 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -27,37 +27,38 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPushServiceImpl { public class MqttPushServiceImpl {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private SnodeController snodeController;
private ExecutorService pushMqttMessageExecutorService; private ExecutorService pushMqttMessageExecutorService;
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttPushServiceImpl(final SnodeController snodeController) { public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) {
this.snodeController = snodeController; this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor( pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(), mqttConfig.getPushMqttMessageMinPoolSize(),
this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(), mqttConfig.getPushMqttMessageMaxPoolSize(),
3000, 3000,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(mqttConfig.getPushMqttMessageThreadPoolQueueCapacity()),
"pushMqttMessageThread", "pushMqttMessageThread",
false); false);
} }
...@@ -86,7 +87,7 @@ public class MqttPushServiceImpl { ...@@ -86,7 +87,7 @@ public class MqttPushServiceImpl {
RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
//find those clients publishing the message to //find those clients publishing the message to
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Set<Client> clients = new HashSet<>(); Set<Client> clients = new HashSet<>();
if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) { if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
...@@ -109,12 +110,11 @@ public class MqttPushServiceImpl { ...@@ -109,12 +110,11 @@ public class MqttPushServiceImpl {
byte[] body = new byte[message.readableBytes()]; byte[] body = new byte[message.readableBytes()];
message.readBytes(body); message.readBytes(body);
requestCommand.setBody(body); requestCommand.setBody(body);
snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
} }
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
} finally { } finally {
System.out.println("Release Bytebuf");
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);
} }
} else { } else {
......
...@@ -15,20 +15,17 @@ ...@@ -15,20 +15,17 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.mqtt.service.impl;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.snode.service.WillMessageService;
public class WillMessageServiceImpl implements WillMessageService { public class WillMessageServiceImpl implements WillMessageService {
private static ConcurrentHashMap<String/*clientId*/, WillMessage> willMessageTable = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String/*clientId*/, WillMessage> willMessageTable = new ConcurrentHashMap<>();
private final SnodeController snodeController;
public WillMessageServiceImpl(SnodeController snodeController) { public WillMessageServiceImpl() {
this.snodeController = snodeController;
} }
@Override @Override
......
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.util; package org.apache.rocketmq.mqtt.util;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.UUID; import java.util.UUID;
import org.apache.rocketmq.snode.constant.MqttConstant; import org.apache.rocketmq.mqtt.constant.MqttConstant;
public class MqttUtil { public class MqttUtil {
......
...@@ -14,38 +14,37 @@ ...@@ -14,38 +14,37 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class DefaultMqttMessageProcessorTest { public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor; private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock @Mock
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
@Mock
private MqttRemotingServer mqttRemotingServer;
private String topic = "SnodeTopic"; private String topic = "SnodeTopic";
private String group = "SnodeGroup"; private String group = "SnodeGroup";
...@@ -54,7 +53,7 @@ public class DefaultMqttMessageProcessorTest { ...@@ -54,7 +53,7 @@ public class DefaultMqttMessageProcessorTest {
@Before @Before
public void init() { public void init() {
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController); defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer);
} }
@Test @Test
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectPayload;
...@@ -22,11 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; ...@@ -22,11 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
...@@ -38,10 +36,13 @@ public class MqttConnectMessageHandlerTest { ...@@ -38,10 +36,13 @@ public class MqttConnectMessageHandlerTest {
@Mock @Mock
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
@Mock
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Test @Test
public void testHandlerMessage() throws Exception { public void testHandlerMessage() throws Exception {
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig())); MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes())); MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes()));
......
...@@ -14,20 +14,18 @@ ...@@ -14,20 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
...@@ -39,17 +37,19 @@ public class MqttDisconnectMessageHandlerTest { ...@@ -39,17 +37,19 @@ public class MqttDisconnectMessageHandlerTest {
@Mock @Mock
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
@Mock
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Test @Test
public void testHandlerMessage() throws Exception { public void testHandlerMessage() throws Exception {
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController); defaultMqttMessageProcessor);
Client client = new Client(); Client client = new Client();
client.setRemotingChannel(remotingChannel); client.setRemotingChannel(remotingChannel);
client.setClientId("123456"); client.setClientId("123456");
snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); defaultMqttMessageProcessor.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client);
snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage()); defaultMqttMessageProcessor.getWillMessageService().saveWillMessage("123456", new WillMessage());
MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader( MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader(
MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200));
......
...@@ -14,31 +14,24 @@ ...@@ -14,31 +14,24 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.mqtt;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.snode.SnodeTestBase; import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class WillMessageServiceImplTest extends SnodeTestBase { public class WillMessageServiceImplTest {
@Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private WillMessageService willMessageService; private WillMessageService willMessageService;
@Before @Before
public void init() { public void init() {
willMessageService = new WillMessageServiceImpl(snodeController); willMessageService = new WillMessageServiceImpl();
} }
@Test @Test
......
...@@ -126,6 +126,7 @@ ...@@ -126,6 +126,7 @@
<module>distribution</module> <module>distribution</module>
<module>openmessaging</module> <module>openmessaging</module>
<module>logging</module> <module>logging</module>
<module>mqtt</module>
<module>snode</module> <module>snode</module>
<module>acl</module> <module>acl</module>
</modules> </modules>
...@@ -533,6 +534,11 @@ ...@@ -533,6 +534,11 @@
<artifactId>rocketmq-snode</artifactId> <artifactId>rocketmq-snode</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-mqtt</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
......
...@@ -97,6 +97,10 @@ ...@@ -97,6 +97,10 @@
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId> <artifactId>rocketmq-broker</artifactId>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-mqtt</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -27,11 +27,13 @@ import org.apache.rocketmq.broker.BrokerStartup; ...@@ -27,11 +27,13 @@ import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory; import org.apache.rocketmq.remoting.RemotingClientFactory;
...@@ -48,18 +50,15 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext; ...@@ -48,18 +50,15 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.remoting.util.ServiceProvider;
import org.apache.rocketmq.snode.client.ClientHousekeepingService; import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.client.SubscriptionManager; import org.apache.rocketmq.snode.client.SubscriptionManager;
import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl; import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor; import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor; import org.apache.rocketmq.snode.processor.SendMessageProcessor;
...@@ -69,16 +68,13 @@ import org.apache.rocketmq.snode.service.MetricsService; ...@@ -69,16 +68,13 @@ import org.apache.rocketmq.snode.service.MetricsService;
import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.NnodeService;
import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService; import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
public class SnodeController { public class SnodeController {
...@@ -105,7 +101,7 @@ public class SnodeController { ...@@ -105,7 +101,7 @@ public class SnodeController {
private ScheduledService scheduledService; private ScheduledService scheduledService;
private ClientManager producerManager; private ClientManager producerManager;
private ClientManager consumerManager; private ClientManager consumerManager;
private ClientManager iotClientManager; // private ClientManager iotClientManager;
private SubscriptionManager subscriptionManager; private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService; private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager; private SubscriptionGroupManager subscriptionGroupManager;
...@@ -122,8 +118,8 @@ public class SnodeController { ...@@ -122,8 +118,8 @@ public class SnodeController {
private ClientService clientService; private ClientService clientService;
private SlowConsumerService slowConsumerService; private SlowConsumerService slowConsumerService;
private MetricsService metricsService; private MetricsService metricsService;
private WillMessageService willMessageService; // private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService; // private MqttPushServiceImpl mqttPushService;
private final ScheduledExecutorService scheduledExecutorService = Executors private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( .newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
...@@ -153,7 +149,12 @@ public class SnodeController { ...@@ -153,7 +149,12 @@ public class SnodeController {
if (this.mqttRemotingClient != null) { if (this.mqttRemotingClient != null) {
this.mqttRemotingClient.init(this.mqttClientConfig, null); this.mqttRemotingClient.init(this.mqttClientConfig, null);
} }
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(),
...@@ -211,19 +212,19 @@ public class SnodeController { ...@@ -211,19 +212,19 @@ public class SnodeController {
this.sendMessageProcessor = new SendMessageProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this);
this.heartbeatProcessor = new HeartbeatProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this);
this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this); this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer);
this.pushService = new PushServiceImpl(this); this.pushService = new PushServiceImpl(this);
this.clientService = new ClientServiceImpl(this); this.clientService = new ClientServiceImpl(this);
this.subscriptionManager = new SubscriptionManagerImpl(); this.subscriptionManager = new SubscriptionManagerImpl();
this.producerManager = new ProducerManagerImpl(); this.producerManager = new ProducerManagerImpl();
this.consumerManager = new ConsumerManagerImpl(this); this.consumerManager = new ConsumerManagerImpl(this);
this.iotClientManager = new IOTClientManagerImpl(this); // this.iotClientManager = new IOTClientManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager); this.consumerManager, null);
this.slowConsumerService = new SlowConsumerServiceImpl(this); this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl(); this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this); // this.willMessageService = new WillMessageServiceImpl(this);
this.mqttPushService = new MqttPushServiceImpl(this); // this.mqttPushService = new MqttPushServiceImpl(this);
} }
public SnodeConfig getSnodeConfig() { public SnodeConfig getSnodeConfig() {
...@@ -258,12 +259,6 @@ public class SnodeController { ...@@ -258,12 +259,6 @@ public class SnodeController {
this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService); this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
} }
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
registerProcessor(); registerProcessor();
return true; return true;
} }
...@@ -507,13 +502,13 @@ public class SnodeController { ...@@ -507,13 +502,13 @@ public class SnodeController {
this.consumerManager = consumerManager; this.consumerManager = consumerManager;
} }
public ClientManager getIotClientManager() { /* public ClientManager getIotClientManager() {
return iotClientManager; return iotClientManager;
} }
public void setIotClientManager(ClientManager iotClientManager) { public void setIotClientManager(ClientManager iotClientManager) {
this.iotClientManager = iotClientManager; this.iotClientManager = iotClientManager;
} }*/
public SubscriptionManager getSubscriptionManager() { public SubscriptionManager getSubscriptionManager() {
return subscriptionManager; return subscriptionManager;
...@@ -551,7 +546,7 @@ public class SnodeController { ...@@ -551,7 +546,7 @@ public class SnodeController {
this.metricsService = metricsService; this.metricsService = metricsService;
} }
public WillMessageService getWillMessageService() { /* public WillMessageService getWillMessageService() {
return willMessageService; return willMessageService;
} }
...@@ -566,5 +561,5 @@ public class SnodeController { ...@@ -566,5 +561,5 @@ public class SnodeController {
public void setMqttPushService(MqttPushServiceImpl mqttPushService) { public void setMqttPushService(MqttPushServiceImpl mqttPushService) {
this.mqttPushService = mqttPushService; this.mqttPushService = mqttPushService;
} }*/
} }
...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.client; ...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -49,7 +51,7 @@ public class ClientHousekeepingService implements ChannelEventListener { ...@@ -49,7 +51,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void shutdown() { public void shutdown() {
this.producerManager.shutdown(); this.producerManager.shutdown();
this.consumerManager.shutdown(); this.consumerManager.shutdown();
this.iotClientManager.shutdown(); // this.iotClientManager.shutdown();
} }
private Client getClient(RemotingChannel remotingChannel) { private Client getClient(RemotingChannel remotingChannel) {
...@@ -75,9 +77,9 @@ public class ClientHousekeepingService implements ChannelEventListener { ...@@ -75,9 +77,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
case Producer: case Producer:
this.producerManager.onClose(client.getGroups(), remotingChannel); this.producerManager.onClose(client.getGroups(), remotingChannel);
return; return;
case IOTCLIENT: // case IOTCLIENT:
this.iotClientManager.onClose(client.getGroups(), remotingChannel); // this.iotClientManager.onClose(client.getGroups(), remotingChannel);
return; // return;
default: default:
} }
} }
......
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
package org.apache.rocketmq.snode.client; package org.apache.rocketmq.snode.client;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.client.impl.Subscription;
public interface SubscriptionManager { public interface SubscriptionManager {
boolean subscribe(String groupId, Set<SubscriptionData> subscriptionDataSet, ConsumeType consumeType, boolean subscribe(String groupId, Set<SubscriptionData> subscriptionDataSet, ConsumeType consumeType,
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.snode.client.impl;
import org.apache.rocketmq.common.client.ClientManagerImpl;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.snode.client.impl;
import org.apache.rocketmq.common.client.ClientManagerImpl;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
public class ProducerManagerImpl extends ClientManagerImpl { public class ProducerManagerImpl extends ClientManagerImpl {
......
...@@ -21,6 +21,7 @@ import java.util.Iterator; ...@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
package org.apache.rocketmq.snode.constant; package org.apache.rocketmq.snode.constant;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.common.client.ClientRole;
public class SnodeConstant { public class SnodeConstant {
public static final long HEARTBEAT_TIME_OUT = 3000; public static final long HEARTBEAT_TIME_OUT = 3000;
......
...@@ -20,6 +20,8 @@ import io.netty.channel.Channel; ...@@ -20,6 +20,8 @@ import io.netty.channel.Channel;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -41,8 +43,6 @@ import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; ...@@ -41,8 +43,6 @@ import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
public class HeartbeatProcessor implements RequestProcessor { public class HeartbeatProcessor implements RequestProcessor {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -36,7 +37,6 @@ import org.apache.rocketmq.remoting.interceptor.RequestContext; ...@@ -36,7 +37,6 @@ import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.impl.Subscription;
public class PullMessageProcessor implements RequestProcessor { public class PullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
......
...@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService; ...@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -40,8 +42,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; ...@@ -40,8 +42,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.PushService;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册