diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index b758ac70609ff3b7b2da8abc34468c6fa9a479de..2b3793d4640c38238541eaa1b582776801cd77bf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { } + @Override + public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + } + @Override public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index 9deed0e3dfe8b400ef4aafa77523bcd70ffee833..16ab829f9d850bd5cb3880980bc01d7b45c00432 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -71,4 +71,12 @@ public interface OffsetStore { */ void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + /** + * @param mq + * @param offset + * @param isOneway + */ + void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index d7ec1693aa70fcd4c36641545c92401a5eaddfcf..23d33cfbc459c466ebd81dafa7ef9954e35ee5c2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -87,7 +87,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } case READ_FROM_STORE: { try { - long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); + long brokerOffset = this.fetchConsumeOffsetFromSnode(mq); AtomicLong offset = new AtomicLong(brokerOffset); this.updateOffset(mq, offset.get(), false); return brokerOffset; @@ -195,14 +195,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - updateConsumeOffsetToBroker(mq, offset, true); + updateConsumeOffsetToSnode(mq, offset, true); } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); @@ -226,10 +226,65 @@ public class RemoteBrokerOffsetStore implements OffsetStore { snodeAddr, requestHeader, 1000 * 5); } } else { - throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); + throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null); + } + } + + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ + @Override + public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, + boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + if (null == findBrokerResult) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (findBrokerResult != null) { + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setCommitOffset(offset); + requestHeader.setEnodeName(mq.getBrokerName()); + if (isOneway) { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } else { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } + } else { + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } } + private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr) { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + } + + if (snodeAddr != null) { + QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setEnodeName(mq.getBrokerName()); + return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( + snodeAddr, requestHeader, 1000 * 5); + } else { + throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null); + } + } + + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0080f1ee534ac5ecce2c04ca230c89124b6ff598..3fdc05d03fd3f52ef72a39ed0e0b039470b03e3a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { public SnodeClusterInfo getSnodeClusterInfo( //Todo Redifine snode exception final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException , MQBrokerException{ + RemotingSendRequestException, RemotingConnectException , MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 20a72d8e832db88db5ab8b1574532f6f55901e32..829158be0cbaadfce35ced6519039e1e315264f8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); } - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 569055dea2566068b0ca165d893b710dcd08b751..f060ad8e446bfe97a2e568392cd6442727c12f9b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest { field.set(mQClientFactory, mQClientAPIImpl); when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); + } @After diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 7224794982bc459dbf68a384d399a958682355fc..90de649396c31d4efb2bc851bc035b6fc306e5c4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish(); + Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 64d64f29a9489fbc77577c71170e552718be3263..d8bf38201a1545a657541d13507829ac446ffd3d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.HashSet; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; @@ -34,7 +33,6 @@ import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest { System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets"); String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis(); when(mQClientFactory.getClientId()).thenReturn(clientId); - when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false)); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); } @Test diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index e2793f565e6cf1974519624ec8ea7fcacebb9f99..210c05ea3c003d7025c5d8c55df2b0ec437b8f4b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -106,6 +106,8 @@ public class DefaultMQProducerTest { when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); + } @After diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java similarity index 98% rename from snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java rename to common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index 6c51fadb765d43a9d02d450119d95144e2549f59..fd3b299b13e9ca3ac44f280ed0f0ca5e6978bcbf 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -14,21 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.config; +package org.apache.rocketmq.common; import java.net.InetAddress; import java.net.UnknownHostException; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; -import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY; public class SnodeConfig { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 18fc905f0a4518b75ce87db0f7ef97f7de2875f4..a366cc7d54560171e6b586037d8f7878f8074bd7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; @@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index d6060a90c6a58262a4d1798fcba5e342e76e9ed1..232def9ca35ce031fa7c696bca75d9db62c49eb6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -26,11 +26,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.srvutil.ServerUtil; import org.slf4j.LoggerFactory; @@ -123,19 +124,31 @@ public class SnodeStartup { nettyClientConfig, snodeConfig); + boolean initResult = snodeController.initialize(); + if (!initResult) { + snodeController.shutdown(); + System.exit(-3); + } + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { + log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { this.hasShutdown = true; + long beginTime = System.currentTimeMillis(); snodeController.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - beginTime; + log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } - })); + },"ShutdownHook")); return snodeController; } @@ -143,6 +156,15 @@ public class SnodeStartup { Option opt = new Option("c", "configFile", true, "SNode config properties file"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "printImportantConfig", false, "Print important config item"); + opt.setRequired(false); + options.addOption(opt); + return options; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index ceba1fc1b324651223a5126b68752e75e38d7469..5e75322c4bca9cf4036dc6605746bce06aae0f3d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -28,7 +28,7 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.snode.constant.SnodeConstant; public class ClientHousekeepingService implements ChannelEventListener { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final ClientManager producerManager; private final ClientManager consumerManager; private final ClientManager iotClientManager; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index a3e29c0c1cde02d3772d6e57b3436c8e8c5766bc..03e21549c77fe7667ce43a4e97c7600a8c0c1577 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; public class ConsumerManageProcessor implements RequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 1a3ced1ac10fc83ef03215187d10c44ba9dbf0d2..1ca672ffc12006567601d39a9f943422a5746233 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private Map type2handler = new HashMap<>(); private static final int MIN_AVAILABLE_VERSION = 3; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java index 84f224b234a6743ed3a4e88c739284da8c3b6d55..f5a66000c0565903f726e922598e346b0bf0078f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service; import java.util.Set; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.snode.config.SnodeConfig; public interface NnodeService { /** diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index 8dcdf0c61dba53ebe9fd71455dd7679c2cda9098..575855523c87eedc64b9a181c40f7a33f70b483f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; @@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.NnodeService; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 685af3f05abb2533772dcc5613d59a7b65f2dd0c..0e3479aca81d3a78affb825337d04cbc2e1b0acc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; @@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.ScheduledService; public class ScheduledServiceImpl implements ScheduledService { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java index ff5235b2dff234aa318911628175537282d097df..414f06bb9e8220569ddb5d23f8b9262d82783b9e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.snode; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 2a2a4459c28f3103c12ef8f9e9e4bab1652479fa..71863068be97c02aa1e90989e63c288a821a91dc 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; @@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java index b610057fecfd0c2abccc67eac1a7bb3da447ce5e..8846ef1f1d8cf4ccc9d5675e8fff8b87107fa6ed 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index a7a26672bd89e0303970e8374b602aa69826b1ab..2573f0272f24b5317b4a2d5c1afa95fac9707737 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; @@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.NnodeService; import org.junit.Before; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java index c228a83f733c48837c8ff242562b85189dac920e..4255c773afb1a34f2213c06036fcdd6c4a3a053b 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index 26c528ab54a9467ea5f02d5876ddd736d3ef4921..ff736c294a6378d9f8f8bd719ccf80bbd1b12e8e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java index 6a25009e04f8db0ef02a8e84fe81993359b602e4..5338a23dec28c55c687d787794e27d63b00f3c66 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -16,12 +16,12 @@ */ package org.apache.rocketmq.snode.service; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.junit.Before; import org.junit.Test;