From 9fc0ce454f558f8d385116da60a812868388945a Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 31 Jan 2019 11:03:29 +0800 Subject: [PATCH] Polish snode register process --- .../client/impl/SlowConsumerServiceImpl.java | 1 - .../MqttSubscribeMessageHandler.java | 9 +- .../rocketmq/snode/service/NnodeService.java | 23 ++++- .../snode/service/impl/EnodeServiceImpl.java | 4 +- .../snode/service/impl/NnodeServiceImpl.java | 5 +- .../service/impl/ScheduledServiceImpl.java | 7 +- .../processor/SendMessageProcessorTest.java | 9 +- .../snode/service/EnodeServiceImplTest.java | 22 +++-- .../snode/service/NnodeServiceImplTest.java | 87 +++++++++++++++++++ 9 files changed, 143 insertions(+), 24 deletions(-) create mode 100644 snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java index b96f4ee3..d49a4dad 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java @@ -41,7 +41,6 @@ public class SlowConsumerServiceImpl implements SlowConsumerService { log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset); return true; } - return false; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java index 2867a975..f264077d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java @@ -24,16 +24,17 @@ import org.apache.rocketmq.snode.SnodeController; public class MqttSubscribeMessageHandler implements MessageHandler { -/* private SubscriptionStore subscriptionStore; + /* private SubscriptionStore subscriptionStore; - public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) { - this.subscriptionStore = subscriptionStore; - }*/ + public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) { + this.subscriptionStore = subscriptionStore; + }*/ private final SnodeController snodeController; public MqttSubscribeMessageHandler(SnodeController snodeController) { this.snodeController = snodeController; } + /** * handle the SUBSCRIBE message from the client *
    diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java index 1c358b78..84f224b2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java @@ -26,10 +26,25 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.snode.config.SnodeConfig; public interface NnodeService { - void registerSnode(SnodeConfig snodeConfig); - - void updateNnodeAddressList(final String addrs); - + /** + * Register Snode to Nnode(Name server) includes information: snodeAddress, snodeName, snodeClusterName. + * + * @param snodeConfig {@link SnodeConfig} + */ + void registerSnode(SnodeConfig snodeConfig) throws Exception; + + /** + * Update Nnode server address list. + * + * @param addresses Node name service list + */ + void updateNnodeAddressList(final String addresses); + + /** + * Fetch Node server address + * + * @return Node address + */ String fetchNnodeAdress(); void updateTopicRouteDataByTopic(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java index 3a1a7fb1..e1a02c01 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java @@ -244,12 +244,12 @@ public class EnodeServiceImpl implements EnodeService { TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); + requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); - requestHeader.setPerm(topicConfig.getPerm()); + requestHeader.setOrder(topicConfig.isOrder()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); - requestHeader.setOrder(topicConfig.isOrder()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(address, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index d30543e6..d593613b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService { } @Override - public void registerSnode(SnodeConfig snodeConfig) { + public void registerSnode(SnodeConfig snodeConfig) throws Exception{ List nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList(); RemotingCommand remotingCommand = new RemotingCommand(); RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader(); @@ -75,6 +75,9 @@ public class NnodeServiceImpl implements NnodeService { log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex); } } + } else { + log.warn("Nnode server list is null"); + throw new RemotingSendRequestException("Nnode server list is null"); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 5370e64e..685af3f0 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.rocketmq.snode.service.impl; + import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -88,7 +89,11 @@ public class ScheduledServiceImpl implements ScheduledService { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { - snodeController.getNnodeService().registerSnode(snodeConfig); + try { + snodeController.getNnodeService().registerSnode(snodeConfig); + } catch (Exception ex) { + log.warn("Register snode error", ex); + } } }, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index 448abc9f..a7a26672 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class SendMessageProcessorTest { + private SendMessageProcessor sendMessageProcessor; @Spy @@ -52,12 +53,10 @@ public class SendMessageProcessorTest { @Mock private RemotingChannel remotingChannel; - private String topic = "SnodeTopic"; - - private String group = "SnodeGroup"; - - private String enodeName = "enodeName"; + private String topic = "snodeTopic"; + private String group = "snodeGroup"; + @Mock private EnodeService enodeService; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java index 3cd41257..c228a83f 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; @@ -53,6 +53,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class EnodeServiceImplTest extends SnodeTestBase { + private EnodeService enodeService; @Spy @@ -66,20 +67,20 @@ public class EnodeServiceImplTest extends SnodeTestBase { private String enodeName = "enodeName"; - private String topic = "SnodeTopic"; + private String topic = "snodeTopic"; - private String group = "SnodeGroup"; + private String group = "snodeGroup"; @Before public void init() { + snodeController.setNnodeService(nnodeService); + snodeController.setRemotingClient(remotingClient); enodeService = new EnodeServiceImpl(snodeController); } @Test public void sendMessageTest() throws Exception { - snodeController.setNnodeService(nnodeService); - snodeController.setRemotingClient(remotingClient); - when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024"); + when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911"); CompletableFuture responseCF = new CompletableFuture<>(); doAnswer(new Answer() { @Override @@ -123,6 +124,15 @@ public class EnodeServiceImplTest extends SnodeTestBase { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void creatTopicTest() throws Exception { + when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911"); + when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse()); + TopicConfig topicConfig = new TopicConfig(topic, 1, 1, 2); + RemotingCommand response = enodeService.creatTopic(enodeName, topicConfig); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + private GetMessageResult createGetMessageResult() { GetMessageResult getMessageResult = new GetMessageResult(); getMessageResult.setStatus(GetMessageStatus.FOUND); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java new file mode 100644 index 00000000..45a19dd8 --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.SnodeTestBase; +import org.apache.rocketmq.snode.config.SnodeConfig; +import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NnodeServiceImplTest extends SnodeTestBase { + + @Spy + private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + + @Mock + private NettyRemotingClient remotingClient; + + private NnodeService nnodeService; + + @Before + public void init() { + snodeController.setRemotingClient(remotingClient); + nnodeService = new NnodeServiceImpl(snodeController); + } + + @Test + public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList()); + when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse()); + try { + nnodeService.registerSnode(createSnodeConfig()); + } catch (Exception ex) { + assertThat(ex).isNull(); + } + } + + private List createNnodeList() { + List addresses = new ArrayList<>(); + addresses.add("127.0.0.1:9876"); + return addresses; + } + + private SnodeConfig createSnodeConfig() { + SnodeConfig snodeConfig = new SnodeConfig(); + snodeConfig.setClusterName("defaultCluster"); + snodeConfig.setSnodeIP1("127.0.0.1:10911"); + snodeConfig.setSnodeName("snode-a"); + return snodeConfig; + } +} -- GitLab