提交 c4297921 编写于 作者: C chengxiangwang

add test case(unfinished)

上级 5481c2bc
...@@ -26,7 +26,7 @@ public class InFlightMessage { ...@@ -26,7 +26,7 @@ public class InFlightMessage {
private final String messageId; private final String messageId;
private final long queueOffset; private final long queueOffset;
InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId, public InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId,
long queueOffset) { long queueOffset) {
this.topic = topic; this.topic = topic;
this.pushQos = pushQos; this.pushQos = pushQos;
......
...@@ -45,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl; ...@@ -45,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import static org.apache.rocketmq.mqtt.constant.MqttConstant.TOPIC_CLIENTID_SEPARATOR;
public class MQTTSession extends Client { public class MQTTSession extends Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
...@@ -110,6 +112,11 @@ public class MQTTSession extends Client { ...@@ -110,6 +112,11 @@ public class MQTTSession extends Client {
return Objects.equals(this.getClientId(), client.getClientId()); return Objects.equals(this.getClientId(), client.getClientId());
} }
@Override
public int hashCode() {
return Objects.hash(this.getClientId());
}
public boolean isConnected() { public boolean isConnected() {
return isConnected; return isConnected;
} }
...@@ -143,7 +150,7 @@ public class MQTTSession extends Client { ...@@ -143,7 +150,7 @@ public class MQTTSession extends Client {
inflightSlots.decrementAndGet(); inflightSlots.decrementAndGet();
mqttHeader.setPacketId(getNextPacketId()); mqttHeader.setPacketId(getNextPacketId());
inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getMsgId(), messageExt.getQueueOffset())); inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getMsgId(), messageExt.getQueueOffset()));
inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS)); // inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS));
put2processTable(((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt); put2processTable(((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt);
pushMessage2Client(mqttHeader, messageExt.getBody()); pushMessage2Client(mqttHeader, messageExt.getBody());
} }
...@@ -155,7 +162,7 @@ public class MQTTSession extends Client { ...@@ -155,7 +162,7 @@ public class MQTTSession extends Client {
ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(); ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable();
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map = processTable.get(remove.getBrokerData().getBrokerName()); ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map = processTable.get(remove.getBrokerData().getBrokerName());
if (map != null) { if (map != null) {
TreeMap<Long, MessageExt> treeMap = map.get(rootTopic + "@" + this.getClientId()); TreeMap<Long, MessageExt> treeMap = map.get(rootTopic + TOPIC_CLIENTID_SEPARATOR + this.getClientId());
if (treeMap != null) { if (treeMap != null) {
treeMap.remove(remove.getQueueOffset()); treeMap.remove(remove.getQueueOffset());
} }
...@@ -193,10 +200,10 @@ public class MQTTSession extends Client { ...@@ -193,10 +200,10 @@ public class MQTTSession extends Client {
MessageExt messageExt) { MessageExt messageExt) {
ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map; ConcurrentHashMap<String, TreeMap<Long, MessageExt>> map;
TreeMap<Long, MessageExt> treeMap; TreeMap<Long, MessageExt> treeMap;
String offsetKey = rootTopic + "@" + this.getClientId(); String offsetKey = rootTopic + TOPIC_CLIENTID_SEPARATOR + this.getClientId();
if (processTable.contains(brokerName)) { if (processTable.containsKey(brokerName)) {
map = processTable.get(brokerName); map = processTable.get(brokerName);
if (map.contains(offsetKey)) { if (map.containsKey(offsetKey)) {
treeMap = map.get(offsetKey); treeMap = map.get(offsetKey);
treeMap.putIfAbsent(messageExt.getQueueOffset(), messageExt); treeMap.putIfAbsent(messageExt.getQueueOffset(), messageExt);
} else { } else {
...@@ -252,4 +259,11 @@ public class MQTTSession extends Client { ...@@ -252,4 +259,11 @@ public class MQTTSession extends Client {
return inflightWindow; return inflightWindow;
} }
public DelayQueue<InFlightPacket> getInflightTimeouts() {
return inflightTimeouts;
}
public Hashtable getInUsePacketIds() {
return inUsePacketIds;
}
} }
...@@ -25,6 +25,7 @@ public class MqttConstant { ...@@ -25,6 +25,7 @@ public class MqttConstant {
public static final String SUBSCRIPTION_FLAG_PLUS = "+"; public static final String SUBSCRIPTION_FLAG_PLUS = "+";
public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_FLAG_SHARP = "#";
public static final String SUBSCRIPTION_SEPARATOR = "/"; public static final String SUBSCRIPTION_SEPARATOR = "/";
public static final String TOPIC_CLIENTID_SEPARATOR = "@";
public static final long DEFAULT_TIMEOUT_MILLS = 3000L; public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS"; public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS";
public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
......
...@@ -20,6 +20,7 @@ package org.apache.rocketmq.mqtt.mqtthandler; ...@@ -20,6 +20,7 @@ package org.apache.rocketmq.mqtt.mqtthandler;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashSet; import java.util.HashSet;
...@@ -56,7 +57,7 @@ public interface MessageHandler { ...@@ -56,7 +57,7 @@ public interface MessageHandler {
if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) { if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) {
Set<Client> clients = topic2Clients.get(MqttUtil.getRootTopic(topic)); Set<Client> clients = topic2Clients.get(MqttUtil.getRootTopic(topic));
for (Client client : clients) { for (Client client : clients) {
if(((MQTTSession)client).isConnected()) { if (((MQTTSession) client).isConnected()) {
Subscription subscription = clientId2Subscription.get(client.getClientId()); Subscription subscription = clientId2Subscription.get(client.getClientId());
Enumeration<String> keys = subscription.getSubscriptionTable().keys(); Enumeration<String> keys = subscription.getSubscriptionTable().keys();
while (keys.hasMoreElements()) { while (keys.hasMoreElements()) {
...@@ -71,18 +72,18 @@ public interface MessageHandler { ...@@ -71,18 +72,18 @@ public interface MessageHandler {
return clientsTobePush; return clientsTobePush;
} }
default RemotingCommand doResponse(MqttFixedHeader fixedHeader) { default RemotingCommand doResponse(MqttFixedHeader fixedHeader, MqttPublishVariableHeader variableHeader) {
if (fixedHeader.qosLevel().value() > 0) { if (fixedHeader.qosLevel().value() > 0) {
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
mqttHeader.setMessageType(MqttMessageType.PUBACK.value()); mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
mqttHeader.setDup(false); mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false); mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(2); mqttHeader.setRemainingLength(2);
mqttHeader.setPacketId(0); mqttHeader.setPacketId(variableHeader.packetId());
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { } else if (fixedHeader.qosLevel().equals(MqttQoS.EXACTLY_ONCE)) {
//PUBREC/PUBREL/PUBCOMP //PUBREC/PUBREL/PUBCOMP
} }
return command; return command;
......
...@@ -91,7 +91,7 @@ public class MqttMessageForwardHandler implements MessageHandler { ...@@ -91,7 +91,7 @@ public class MqttMessageForwardHandler implements MessageHandler {
//add task to orderedExecutor //add task to orderedExecutor
this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask)); this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask));
} }
return doResponse(fixedHeader); return doResponse(fixedHeader, variableHeader);
} }
return null; return null;
} }
......
...@@ -92,7 +92,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -92,7 +92,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message; MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader(); MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) { if (!MqttUtil.isQosLegal(fixedHeader.qosLevel())) {
log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close(); remotingChannel.close();
return null; return null;
...@@ -128,7 +128,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -128,7 +128,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
} finally { } finally {
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);
} }
break;
case AT_LEAST_ONCE: case AT_LEAST_ONCE:
// Store msg and invoke callback to publish msg to subscribers // Store msg and invoke callback to publish msg to subscribers
// 1. Check if the root topic has been created // 1. Check if the root topic has been created
...@@ -183,11 +183,11 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -183,11 +183,11 @@ public class MqttPublishMessageHandler implements MessageHandler {
log.error("Store Qos=1 Message error: {}", ex); log.error("Store Qos=1 Message error: {}", ex);
} }
}); });
break;
case EXACTLY_ONCE: case EXACTLY_ONCE:
throw new MqttRuntimeException("Qos = 2 messages are not supported yet."); throw new MqttRuntimeException("Qos = 2 messages are not supported yet.");
} }
return doResponse(fixedHeader); return doResponse(fixedHeader, variableHeader);
} }
private void transferMessage(Set<String> snodeAddresses, String topic, byte[] body) throws MqttException { private void transferMessage(Set<String> snodeAddresses, String topic, byte[] body) throws MqttException {
......
...@@ -59,7 +59,8 @@ public class InnerMqttMessageProcessor implements RequestProcessor { ...@@ -59,7 +59,8 @@ public class InnerMqttMessageProcessor implements RequestProcessor {
private NnodeService nnodeService; private NnodeService nnodeService;
private MqttMessageForwardHandler mqttMessageForwardHandler; private MqttMessageForwardHandler mqttMessageForwardHandler;
public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, RemotingServer innerMqttRemotingServer) { public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor,
RemotingServer innerMqttRemotingServer) {
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService(); this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService();
this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager(); this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager();
...@@ -73,14 +74,14 @@ public class InnerMqttMessageProcessor implements RequestProcessor { ...@@ -73,14 +74,14 @@ public class InnerMqttMessageProcessor implements RequestProcessor {
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader(); MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
if(mqttHeader.getMessageType().equals(PUBLISH)){ if (mqttHeader.getMessageType().equals(PUBLISH)) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()); mqttHeader.getRemainingLength());
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()); MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody())); MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
return mqttMessageForwardHandler.handleMessage(mqttMessage, remotingChannel); return mqttMessageForwardHandler.handleMessage(mqttMessage, remotingChannel);
}else{ } else {
return defaultMqttMessageProcessor.processRequest(remotingChannel, message); return defaultMqttMessageProcessor.processRequest(remotingChannel, message);
} }
......
...@@ -113,7 +113,7 @@ public class MqttPushTask implements Runnable { ...@@ -113,7 +113,7 @@ public class MqttPushTask implements Runnable {
//push message if in-flight window has slot(not full) //push message if in-flight window has slot(not full)
client.pushMessageQos1(mqttHeader, messageExt, brokerData); client.pushMessageQos1(mqttHeader, messageExt, brokerData);
} }
if (inflightFullFlag == true) { if (inflightFullFlag) {
break; break;
} }
maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
......
...@@ -39,7 +39,7 @@ public class MqttUtil { ...@@ -39,7 +39,7 @@ public class MqttUtil {
if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) { if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) {
return false; return false;
} }
return false; return true;
} }
public static boolean isMatch(String topicFiter, String topic) { public static boolean isMatch(String topicFiter, String topic) {
......
...@@ -36,10 +36,6 @@ public class MathUtils { ...@@ -36,10 +36,6 @@ public class MathUtils {
return mod; return mod;
} }
public static int findNextPositivePowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
/** /**
* Current time from some arbitrary time base in the past, counting in * Current time from some arbitrary time base in the past, counting in
* nanoseconds, and not affected by settimeofday or similar system clock * nanoseconds, and not affected by settimeofday or similar system clock
...@@ -55,17 +51,6 @@ public class MathUtils { ...@@ -55,17 +51,6 @@ public class MathUtils {
return System.nanoTime(); return System.nanoTime();
} }
/**
* Milliseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
public static long elapsedMSec(long startNanoTime) {
return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND;
}
/** /**
* Microseconds elapsed since the time specified, the input is nanoTime * Microseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time. * the only conversion happens when computing the elapsed time.
......
/* /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
......
...@@ -19,12 +19,13 @@ package org.apache.rocketmq.mqtt; ...@@ -19,12 +19,13 @@ package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException; import org.apache.rocketmq.common.exception.MQClientException;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer; import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer;
...@@ -37,6 +38,8 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -37,6 +38,8 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class DefaultMqttMessageProcessorTest { public class DefaultMqttMessageProcessorTest {
@Mock
private DefaultMqttMessageProcessor defaultMqttMessageProcessor; private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Mock @Mock
...@@ -53,11 +56,10 @@ public class DefaultMqttMessageProcessorTest { ...@@ -53,11 +56,10 @@ public class DefaultMqttMessageProcessorTest {
@Before @Before
public void init() { public void init() {
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer);
} }
@Test @Test
public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException { public void testProcessRequest() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
RemotingCommand request = createMqttConnectMesssageCommand(); RemotingCommand request = createMqttConnectMesssageCommand();
defaultMqttMessageProcessor.processRequest(remotingChannel, request); defaultMqttMessageProcessor.processRequest(remotingChannel, request);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.mqtt.client.InFlightMessage;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
@RunWith(MockitoJUnitRunner.class)
public class MQTTSessionTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null);
private MQTTSession mqttSession = new MQTTSession("testClient", ClientRole.IOTCLIENT, new HashSet<String>() {
{
add("IOT_GROUP");
}
}, true, true, null, System.currentTimeMillis(), defaultMqttMessageProcessor);
private ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, MessageExt>>> processTable = new ConcurrentHashMap<>();
@Test
public void test_put2processTable() throws Exception {
Method method = MQTTSession.class.getDeclaredMethod("put2processTable", ConcurrentHashMap.class, String.class, String.class, MessageExt.class);
for (int i = 0; i < 5; i++) {
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt.setQueueOffset(i);
method.setAccessible(true);
method.invoke(mqttSession, processTable, "broker1", "topic" + i, messageExt);
}
MessageExt messageExt_0 = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt_0.setQueueOffset(10);
method.invoke(mqttSession, processTable, "broker2", "topic0", messageExt_0);
MessageExt messageExt_1 = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt_1.setQueueOffset(11);
method.invoke(mqttSession, processTable, "broker2", "topic0", messageExt_1);
assertEquals(2, processTable.size());
assertEquals(5, processTable.get("broker1").size());
assertEquals(2, processTable.get("broker2").get("topic0@testClient").size());
}
@Test
public void test_pushMessageQos1() {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setTopicName("topicTest");
mqttHeader.setQosLevel(1);
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt.setTopic("topicTest");
messageExt.setBody("Hello".getBytes());
BrokerData brokerData = new BrokerData("DefaultCluster", "broker1", null);
for (int i = 0; i < 10; i++) {
mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData);
assertEquals(i + 1, mqttHeader.getPacketId().intValue());
System.out.println(mqttHeader.getPacketId());
}
assertEquals(0, mqttSession.getInflightSlots().get());
assertEquals(10, mqttSession.getInflightWindow().size());
assertEquals(10, mqttSession.getInflightTimeouts().size());
mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData);
assertEquals(0, mqttSession.getInflightSlots().get());
assertEquals(10, mqttSession.getInflightWindow().size());
assertEquals(10, mqttSession.getInflightTimeouts().size());
}
@Test
public void test_pubAckReceived() {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setTopicName("topicTest");
mqttHeader.setQosLevel(1);
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt.setTopic("topicTest");
messageExt.setBody("Hello".getBytes());
BrokerData brokerData = new BrokerData("DefaultCluster", "broker1", null);
for (int i = 0; i < 2; i++) {
mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData);
}
InFlightMessage inFlightMessage = mqttSession.pubAckReceived(1);
assertEquals(9, mqttSession.getInflightSlots().intValue());
assertEquals(1, mqttSession.getInflightWindow().size());
assertEquals(false, mqttSession.getInUsePacketIds().containsKey(1));
}
}
...@@ -22,6 +22,8 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; ...@@ -22,6 +22,8 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
...@@ -42,6 +44,7 @@ public class MqttConnectMessageHandlerTest { ...@@ -42,6 +44,7 @@ public class MqttConnectMessageHandlerTest {
@Test @Test
public void testHandlerMessage() throws Exception { public void testHandlerMessage() throws Exception {
this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null);
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor); MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
......
...@@ -18,23 +18,23 @@ ...@@ -18,23 +18,23 @@
package org.apache.rocketmq.mqtt; package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.verify;
import static org.junit.Assert.*; import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class MqttPingreqMessageHandlerTest { public class MqttPingreqMessageHandlerTest {
...@@ -45,7 +45,7 @@ public class MqttPingreqMessageHandlerTest { ...@@ -45,7 +45,7 @@ public class MqttPingreqMessageHandlerTest {
@Mock @Mock
private MqttMessage mqttMessage; private MqttMessage mqttMessage;
@Mock @Mock
private Client client; private MQTTSession client;
@Mock @Mock
private DefaultMqttMessageProcessor processor; private DefaultMqttMessageProcessor processor;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.InFlightMessage;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@RunWith(MockitoJUnitRunner.class)
public class MqttPubackMessageHandlerTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private MqttPubackMessageHandler mqttPubackMessageHandler;
@Mock
private RemotingChannel remotingChannel;
@Before
public void before() {
defaultMqttMessageProcessor = Mockito.spy(new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null));
mqttPubackMessageHandler = new MqttPubackMessageHandler(defaultMqttMessageProcessor);
}
@Test
public void test_handleMessage_wrongMessageType() {
MqttMessage mqttMessage = new MqttConnectMessage(new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttConnectVariableHeader("name", 0, false, false, false, 1, false, false, 10), new MqttConnectPayload("client1", null, (byte[]) null, null, null));
exception.expect(WrongMessageTypeException.class);
mqttPubackMessageHandler.handleMessage(mqttMessage, remotingChannel);
}
@Test
public void test_handleMessage() {
MqttMessage mqttMessage = new MqttPubAckMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 10), MqttMessageIdVariableHeader.from(1));
IOTClientManagerImpl iotClientManager = Mockito.mock(IOTClientManagerImpl.class);
Mockito.when((IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager()).thenReturn(iotClientManager);
MQTTSession mqttSession = Mockito.spy(new MQTTSession("client1", ClientRole.IOTCLIENT, null, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor));
Mockito.when(iotClientManager.getClient(anyString(), any(RemotingChannel.class))).thenReturn(mqttSession);
InFlightMessage inFlightMessage = Mockito.spy(new InFlightMessage("topicTest", 0, "Hello".getBytes(), null, null, 0));
doReturn(inFlightMessage).when(mqttSession).pubAckReceived(anyInt());
RemotingCommand remotingCommand = mqttPubackMessageHandler.handleMessage(mqttMessage, remotingChannel);
assert remotingCommand == null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
@RunWith(MockitoJUnitRunner.class)
public class MqttPublishMessageHandlerTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@Mock
private MqttRemotingServer mqttRemotingServer;
@Mock
private EnodeService enodeService;
@Mock
private NnodeService nnodeService;
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
private MqttPublishMessageHandler mqttPublishMessageHandler;
@Mock
private RemotingChannel remotingChannel;
private MqttMessage mqttPublishMessage;
@Before
public void before() {
defaultMqttMessageProcessor = Mockito.spy(new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), mqttRemotingServer, enodeService, nnodeService));
mqttPublishMessageHandler = new MqttPublishMessageHandler(defaultMqttMessageProcessor);
}
@Test
public void test_handleMessage_wrongMessageType() throws Exception {
MqttMessage mqttMessage = new MqttConnectMessage(new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttConnectVariableHeader("name", 0, false, false, false, 1, false, false, 10), new MqttConnectPayload("client1", null, (byte[]) null, null, null));
exception.expect(WrongMessageTypeException.class);
mqttPublishMessageHandler.handleMessage(mqttMessage, remotingChannel);
}
@Test
public void test_handleMessage_wrongQos() throws Exception {
mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.FAILURE, false, 10), new MqttPublishVariableHeader("topicTest", 1), Unpooled.copiedBuffer("Hello".getBytes()));
Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel");
RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel);
assert remotingCommand == null;
Mockito.verify(remotingChannel).close();
}
@Test
public void test_handleMessage_qos0() throws Exception {
prepareSubscribeData();
mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttPublishVariableHeader("topic/c", 1), Unpooled.copiedBuffer("Hello".getBytes()));
Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel");
RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel);
assert remotingCommand == null;
}
@Test
public void test_handleMessage_qos1() throws Exception {
prepareSubscribeData();
mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 10), new MqttPublishVariableHeader("topic/c", 1), Unpooled.copiedBuffer("Hello".getBytes()));
Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel");
TopicRouteData topicRouteData = buildTopicRouteData();
Mockito.when(nnodeService.getTopicRouteDataByTopic(anyString(), anyBoolean())).thenReturn(topicRouteData);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
// RemotingCommand response = Mockito.mock(RemotingCommand.class);
Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future);
// doAnswer(mock -> future.complete(response)).when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class)));
RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel);
assert remotingCommand != null;
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
assertEquals(1, mqttHeader.getPacketId().intValue());
assertEquals(MqttQoS.AT_MOST_ONCE, mqttHeader.getQosLevel());
assertEquals(false, mqttHeader.isDup());
assertEquals(false, mqttHeader.isRetain());
assertEquals(2, mqttHeader.getRemainingLength());
}
private TopicRouteData buildTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDataList = new ArrayList<>();
brokerDataList.add(new BrokerData("DefaultCluster", "broker1", null));
topicRouteData.setBrokerDatas(brokerDataList);
return topicRouteData;
}
private void prepareSubscribeData() {
IOTClientManagerImpl manager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager();
ConcurrentHashMap<String, Subscription> subscriptions = manager.getClientId2Subscription();
ConcurrentHashMap<String, Set<Client>> topic2Clients = manager.getTopic2Clients();
Subscription subscription1 = new Subscription();
subscription1.setCleanSession(true);
subscription1.getSubscriptionTable().put("topic/a", new MqttSubscriptionData(1, "client1", "topic/a"));
subscription1.getSubscriptionTable().put("topic/+", new MqttSubscriptionData(1, "client1", "topic/+"));
subscriptions.put("client1", subscription1);
Subscription subscription2 = new Subscription();
subscription2.setCleanSession(true);
subscription2.getSubscriptionTable().put("topic/b", new MqttSubscriptionData(1, "client2", "topic/b"));
subscription2.getSubscriptionTable().put("topic/+", new MqttSubscriptionData(1, "client2", "topic/+"));
subscriptions.put("client2", subscription2);
Subscription subscription3 = new Subscription();
subscription3.setCleanSession(true);
subscription3.getSubscriptionTable().put("test/c", new MqttSubscriptionData(1, "client3", "topic/c"));
subscription3.getSubscriptionTable().put("test/d", new MqttSubscriptionData(1, "client3", "topic/d"));
subscriptions.put("client3", subscription3);
Set<Client> clients_1 = new HashSet<>();
Set<String> groups = new HashSet<String>();
groups.add("IOT_GROUP");
clients_1.add(new MQTTSession("client1", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor));
clients_1.add(new MQTTSession("client2", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor));
Set<Client> clients_2 = new HashSet<>();
groups.add("IOT_GROUP");
clients_1.add(new MQTTSession("client3", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor));
topic2Clients.put("topic", clients_1);
topic2Clients.put("test", clients_2);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MqttSubscribeMessageHandlerTest {
}
...@@ -58,7 +58,7 @@ public class SnodeStartup { ...@@ -58,7 +58,7 @@ public class SnodeStartup {
private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties"; private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties";
private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE); private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE);
public static void main(String[] args) throws IOException, JoranException { public static void main(String[] args) throws IOException, JoranException, CloneNotSupportedException {
SnodeConfig snodeConfig = loadConfig(args); SnodeConfig snodeConfig = loadConfig(args);
MqttConfig mqttConfig = loadMqttConfig(snodeConfig); MqttConfig mqttConfig = loadMqttConfig(snodeConfig);
if (snodeConfig.isEmbeddedModeEnable()) { if (snodeConfig.isEmbeddedModeEnable()) {
...@@ -163,7 +163,7 @@ public class SnodeStartup { ...@@ -163,7 +163,7 @@ public class SnodeStartup {
return mqttConfig; return mqttConfig;
} }
public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException { public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException, CloneNotSupportedException {
final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig); final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig);
......
...@@ -73,6 +73,11 @@ public class LocalEnodeServiceImpl implements EnodeService { ...@@ -73,6 +73,11 @@ public class LocalEnodeServiceImpl implements EnodeService {
return completableFuture; return completableFuture;
} }
@Override public RemotingCommand pullMessageSync(RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
return null;
}
@Override public RemotingCommand creatRetryTopic(RemotingChannel remotingChannel, String enodeName, @Override public RemotingCommand creatRetryTopic(RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) { RemotingCommand request) {
try { try {
......
...@@ -18,30 +18,55 @@ package org.apache.rocketmq.snode; ...@@ -18,30 +18,55 @@ package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Spy;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class SnodeControllerTest { public class SnodeControllerTest {
@Spy
private ServerConfig serverConfig = new ServerConfig();
@Spy
private ClientConfig clientConfig = new ClientConfig();
@Spy
private ServerConfig mqttServerConfig = new ServerConfig();
@Spy
private ClientConfig mqttClientConfig = new ClientConfig();
private SnodeController snodeController;
@Before
public void init() throws CloneNotSupportedException {
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setNettyClientConfig(clientConfig);
serverConfig.setListenPort(10912);
snodeConfig.setNettyServerConfig(serverConfig);
MqttConfig mqttConfig = new MqttConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
mqttConfig.setMqttClientConfig(mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
snodeController = new SnodeController(snodeConfig, mqttConfig);
}
@Test @Test
public void testSnodeRestart() { public void testSnodeRestart() {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setListenPort(10912);
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize()); assertThat(snodeController.initialize());
snodeController.start(); snodeController.start();
snodeController.shutdown(); snodeController.shutdown();
} }
@Test @Test
public void testSnodeRestartWithAclEnable() { public void testSnodeRestartWithAclEnable() throws CloneNotSupportedException {
System.setProperty("rocketmq.home.dir", "src/test/resources"); System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
SnodeConfig snodeConfig = new SnodeConfig(); SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setAclEnable(true); snodeConfig.setAclEnable(true);
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize()); assertThat(snodeController.initialize());
snodeController.start(); snodeController.start();
snodeController.shutdown(); snodeController.shutdown();
......
...@@ -23,13 +23,15 @@ import org.apache.rocketmq.common.SnodeConfig; ...@@ -23,13 +23,15 @@ import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -47,7 +49,15 @@ public class SendMessageProcessorTest { ...@@ -47,7 +49,15 @@ public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor; private SendMessageProcessor sendMessageProcessor;
@Spy @Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private ServerConfig serverConfig = new ServerConfig();
@Spy
private ClientConfig clientConfig = new ClientConfig();
@Spy
private ServerConfig mqttServerConfig = new ServerConfig();
@Spy
private ClientConfig mqttClientConfig = new ClientConfig();
private SnodeController snodeController;
@Mock @Mock
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
...@@ -62,8 +72,22 @@ public class SendMessageProcessorTest { ...@@ -62,8 +72,22 @@ public class SendMessageProcessorTest {
@Mock @Mock
private NnodeService nnodeService; private NnodeService nnodeService;
public SendMessageProcessorTest() {
}
@Before @Before
public void init() { public void init() throws CloneNotSupportedException {
SnodeConfig snodeConfig = new SnodeConfig();
serverConfig.setListenPort(snodeConfig.getListenPort());
snodeConfig.setNettyClientConfig(clientConfig);
snodeConfig.setNettyServerConfig(serverConfig);
MqttConfig mqttConfig = new MqttConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
mqttConfig.setMqttClientConfig(mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
snodeController = new SnodeController(snodeConfig, mqttConfig);
snodeController.setNnodeService(nnodeService); snodeController.setNnodeService(nnodeService);
snodeController.setEnodeService(enodeService); snodeController.setEnodeService(enodeService);
sendMessageProcessor = new SendMessageProcessor(snodeController); sendMessageProcessor = new SendMessageProcessor(snodeController);
...@@ -73,7 +97,7 @@ public class SendMessageProcessorTest { ...@@ -73,7 +97,7 @@ public class SendMessageProcessorTest {
public void testSendMessageV2ProcessRequest() throws RemotingCommandException { public void testSendMessageV2ProcessRequest() throws RemotingCommandException {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageV2Command(); RemotingCommand request = createSendMesssageV2Command();
when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); when(this.snodeController.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request); sendMessageProcessor.processRequest(remotingChannel, request);
} }
...@@ -82,7 +106,7 @@ public class SendMessageProcessorTest { ...@@ -82,7 +106,7 @@ public class SendMessageProcessorTest {
snodeController.setEnodeService(enodeService); snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendBatchMesssageCommand(); RemotingCommand request = createSendBatchMesssageCommand();
when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); when(this.snodeController.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request); sendMessageProcessor.processRequest(remotingChannel, request);
} }
......
...@@ -21,6 +21,8 @@ import java.util.List; ...@@ -21,6 +21,8 @@ import java.util.List;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.service.NnodeService; import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
...@@ -46,15 +48,37 @@ import static org.mockito.Mockito.when; ...@@ -46,15 +48,37 @@ import static org.mockito.Mockito.when;
public class NnodeServiceImplTest extends SnodeTestBase { public class NnodeServiceImplTest extends SnodeTestBase {
@Spy @Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private ServerConfig serverConfig = new ServerConfig();
@Spy
private ClientConfig clientConfig = new ClientConfig();
@Spy
private ServerConfig mqttServerConfig = new ServerConfig();
@Spy
private ClientConfig mqttClientConfig = new ClientConfig();
private SnodeController snodeController;
@Mock @Mock
private NettyRemotingClient remotingClient; private NettyRemotingClient remotingClient;
private NnodeService nnodeService; private NnodeService nnodeService;
public NnodeServiceImplTest() {
}
@Before @Before
public void init() { public void init() throws CloneNotSupportedException {
SnodeConfig snodeConfig = new SnodeConfig();
serverConfig.setListenPort(snodeConfig.getListenPort());
snodeConfig.setNettyClientConfig(clientConfig);
snodeConfig.setNettyServerConfig(serverConfig);
MqttConfig mqttConfig = new MqttConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
mqttConfig.setMqttClientConfig(mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
snodeController = new SnodeController(snodeConfig, mqttConfig);
snodeController.setRemotingClient(remotingClient); snodeController.setRemotingClient(remotingClient);
nnodeService = new NnodeServiceImpl(snodeController); nnodeService = new NnodeServiceImpl(snodeController);
} }
......
...@@ -25,7 +25,9 @@ import org.apache.rocketmq.common.protocol.ResponseCode; ...@@ -25,7 +25,9 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.service.NnodeService; import org.apache.rocketmq.common.service.NnodeService;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
...@@ -57,7 +59,15 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase { ...@@ -57,7 +59,15 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService; private EnodeService enodeService;
@Spy @Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private ServerConfig serverConfig = new ServerConfig();
@Spy
private ClientConfig clientConfig = new ClientConfig();
@Spy
private ServerConfig mqttServerConfig = new ServerConfig();
@Spy
private ClientConfig mqttClientConfig = new ClientConfig();
private SnodeController snodeController;
@Mock @Mock
private NnodeService nnodeService; private NnodeService nnodeService;
...@@ -71,8 +81,22 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase { ...@@ -71,8 +81,22 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private String group = "snodeGroup"; private String group = "snodeGroup";
public RemoteEnodeServiceImplTest() {
}
@Before @Before
public void init() { public void init() throws CloneNotSupportedException {
SnodeConfig snodeConfig = new SnodeConfig();
serverConfig.setListenPort(snodeConfig.getListenPort());
snodeConfig.setNettyClientConfig(clientConfig);
snodeConfig.setNettyServerConfig(serverConfig);
MqttConfig mqttConfig = new MqttConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
mqttConfig.setMqttClientConfig(mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
snodeController = new SnodeController(snodeConfig, mqttConfig);
snodeController.setNnodeService(nnodeService); snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient); snodeController.setRemotingClient(remotingClient);
enodeService = new RemoteEnodeServiceImpl(snodeController); enodeService = new RemoteEnodeServiceImpl(snodeController);
......
...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.service; ...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
...@@ -36,8 +38,17 @@ import static org.mockito.Mockito.when; ...@@ -36,8 +38,17 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class SlowConsumerServiceImplTest { public class SlowConsumerServiceImplTest {
@Spy
private ServerConfig serverConfig = new ServerConfig();
@Spy
private ClientConfig clientConfig = new ClientConfig();
@Spy @Spy
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private ServerConfig mqttServerConfig = new ServerConfig();
@Spy
private ClientConfig mqttClientConfig = new ClientConfig();
private SnodeController snodeController;
private final String enodeName = "testEndoe"; private final String enodeName = "testEndoe";
...@@ -52,8 +63,22 @@ public class SlowConsumerServiceImplTest { ...@@ -52,8 +63,22 @@ public class SlowConsumerServiceImplTest {
@Mock @Mock
private ConsumerOffsetManager consumerOffsetManager; private ConsumerOffsetManager consumerOffsetManager;
public SlowConsumerServiceImplTest() {
}
@Before @Before
public void init() { public void init() throws CloneNotSupportedException {
SnodeConfig snodeConfig = new SnodeConfig();
serverConfig.setListenPort(snodeConfig.getListenPort());
snodeConfig.setNettyClientConfig(clientConfig);
snodeConfig.setNettyServerConfig(serverConfig);
MqttConfig mqttConfig = new MqttConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
mqttConfig.setMqttClientConfig(mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
snodeController = new SnodeController(snodeConfig, mqttConfig);
slowConsumerService = new SlowConsumerServiceImpl(snodeController); slowConsumerService = new SlowConsumerServiceImpl(snodeController);
} }
......
...@@ -46,6 +46,10 @@ ...@@ -46,6 +46,10 @@
<artifactId>truth</artifactId> <artifactId>truth</artifactId>
<version>0.30</version> <version>0.30</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -49,7 +49,7 @@ public class ChinaPropIT extends BaseConf { ...@@ -49,7 +49,7 @@ public class ChinaPropIT extends BaseConf {
/** /**
* @since version3.4.6 * @since version3.4.6
*/ */
@Test(expected = org.apache.rocketmq.client.exception.MQBrokerException.class) @Test(expected = org.apache.rocketmq.common.exception.MQBrokerException.class)
public void testSend20kChinaPropMsg() throws Exception { public void testSend20kChinaPropMsg() throws Exception {
Message msg = MessageFactory.getRandomMessage(topic); Message msg = MessageFactory.getRandomMessage(topic);
msg.putUserProperty("key", RandomUtils.getCheseWord(32 * 1024 + 1)); msg.putUserProperty("key", RandomUtils.getCheseWord(32 * 1024 + 1));
......
...@@ -59,45 +59,45 @@ public class MessageExceptionIT extends BaseConf { ...@@ -59,45 +59,45 @@ public class MessageExceptionIT extends BaseConf {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendNullMessage() throws Exception { public void testSynSendNullMessage() throws Exception {
producer.send((Message) null); producer.send((Message) null);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendNullBodyMessage() throws Exception { public void testSynSendNullBodyMessage() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
msg.setBody(null); msg.setBody(null);
producer.send(msg); producer.send(msg);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendZeroSizeBodyMessage() throws Exception { public void testSynSendZeroSizeBodyMessage() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
msg.setBody(new byte[0]); msg.setBody(new byte[0]);
producer.send(msg); producer.send(msg);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendOutOfSizeBodyMessage() throws Exception { public void testSynSendOutOfSizeBodyMessage() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
msg.setBody(new byte[1024 * 1024 * 4 + 1]); msg.setBody(new byte[1024 * 1024 * 4 + 1]);
producer.send(msg); producer.send(msg);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendNullTopicMessage() throws Exception { public void testSynSendNullTopicMessage() throws Exception {
Message msg = new Message(null, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(null, RandomUtils.getStringByUUID().getBytes());
producer.send(msg); producer.send(msg);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSynSendBlankTopicMessage() throws Exception { public void testSynSendBlankTopicMessage() throws Exception {
Message msg = new Message("", RandomUtils.getStringByUUID().getBytes()); Message msg = new Message("", RandomUtils.getStringByUUID().getBytes());
producer.send(msg); producer.send(msg);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSend128kMsg() throws Exception { public void testSend128kMsg() throws Exception {
Message msg = new Message(topic, Message msg = new Message(topic,
RandomUtils.getStringWithNumber(1024 * 1024 * 4 + 1).getBytes()); RandomUtils.getStringWithNumber(1024 * 1024 * 4 + 1).getBytes());
......
...@@ -55,7 +55,7 @@ public class OneWaySendExceptionIT extends BaseConf { ...@@ -55,7 +55,7 @@ public class OneWaySendExceptionIT extends BaseConf {
producer.sendOneway(msg, messageQueue); producer.sendOneway(msg, messageQueue);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSendSelectorNull() throws Exception { public void testSendSelectorNull() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
...@@ -63,7 +63,7 @@ public class OneWaySendExceptionIT extends BaseConf { ...@@ -63,7 +63,7 @@ public class OneWaySendExceptionIT extends BaseConf {
producer.sendOneway(msg, selector, 100); producer.sendOneway(msg, selector, 100);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class)
public void testSelectorThrowsException() throws Exception { public void testSelectorThrowsException() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册