提交 9fc0ce45 编写于 作者: D duhenglucky

Polish snode register process

上级 8f517779
...@@ -41,7 +41,6 @@ public class SlowConsumerServiceImpl implements SlowConsumerService { ...@@ -41,7 +41,6 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset); log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset);
return true; return true;
} }
return false; return false;
} }
......
...@@ -24,7 +24,7 @@ import org.apache.rocketmq.snode.SnodeController; ...@@ -24,7 +24,7 @@ import org.apache.rocketmq.snode.SnodeController;
public class MqttSubscribeMessageHandler implements MessageHandler { public class MqttSubscribeMessageHandler implements MessageHandler {
/* private SubscriptionStore subscriptionStore; /* private SubscriptionStore subscriptionStore;
public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) { public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore; this.subscriptionStore = subscriptionStore;
...@@ -34,6 +34,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -34,6 +34,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
public MqttSubscribeMessageHandler(SnodeController snodeController) { public MqttSubscribeMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController; this.snodeController = snodeController;
} }
/** /**
* handle the SUBSCRIBE message from the client * handle the SUBSCRIBE message from the client
* <ol> * <ol>
......
...@@ -26,10 +26,25 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; ...@@ -26,10 +26,25 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.config.SnodeConfig;
public interface NnodeService { public interface NnodeService {
void registerSnode(SnodeConfig snodeConfig); /**
* Register Snode to Nnode(Name server) includes information: snodeAddress, snodeName, snodeClusterName.
*
* @param snodeConfig {@link SnodeConfig}
*/
void registerSnode(SnodeConfig snodeConfig) throws Exception;
void updateNnodeAddressList(final String addrs); /**
* Update Nnode server address list.
*
* @param addresses Node name service list
*/
void updateNnodeAddressList(final String addresses);
/**
* Fetch Node server address
*
* @return Node address
*/
String fetchNnodeAdress(); String fetchNnodeAdress();
void updateTopicRouteDataByTopic(); void updateTopicRouteDataByTopic();
......
...@@ -244,12 +244,12 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -244,12 +244,12 @@ public class EnodeServiceImpl implements EnodeService {
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setOrder(topicConfig.isOrder());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address, return this.snodeController.getRemotingClient().invokeSync(address,
......
...@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService {
} }
@Override @Override
public void registerSnode(SnodeConfig snodeConfig) { public void registerSnode(SnodeConfig snodeConfig) throws Exception{
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList(); List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand(); RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader(); RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
...@@ -75,6 +75,9 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -75,6 +75,9 @@ public class NnodeServiceImpl implements NnodeService {
log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex); log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
} }
} }
} else {
log.warn("Nnode server list is null");
throw new RemotingSendRequestException("Nnode server list is null");
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.snode.service.impl;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -88,7 +89,11 @@ public class ScheduledServiceImpl implements ScheduledService { ...@@ -88,7 +89,11 @@ public class ScheduledServiceImpl implements ScheduledService {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
try {
snodeController.getNnodeService().registerSnode(snodeConfig); snodeController.getNnodeService().registerSnode(snodeConfig);
} catch (Exception ex) {
log.warn("Register snode error", ex);
}
} }
}, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); }, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
......
...@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when; ...@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class SendMessageProcessorTest { public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor; private SendMessageProcessor sendMessageProcessor;
@Spy @Spy
...@@ -52,11 +53,9 @@ public class SendMessageProcessorTest { ...@@ -52,11 +53,9 @@ public class SendMessageProcessorTest {
@Mock @Mock
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
private String topic = "SnodeTopic"; private String topic = "snodeTopic";
private String group = "SnodeGroup";
private String enodeName = "enodeName"; private String group = "snodeGroup";
@Mock @Mock
private EnodeService enodeService; private EnodeService enodeService;
......
...@@ -18,7 +18,7 @@ package org.apache.rocketmq.snode.service; ...@@ -18,7 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
...@@ -53,6 +53,7 @@ import static org.mockito.Mockito.when; ...@@ -53,6 +53,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class EnodeServiceImplTest extends SnodeTestBase { public class EnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService; private EnodeService enodeService;
@Spy @Spy
...@@ -66,20 +67,20 @@ public class EnodeServiceImplTest extends SnodeTestBase { ...@@ -66,20 +67,20 @@ public class EnodeServiceImplTest extends SnodeTestBase {
private String enodeName = "enodeName"; private String enodeName = "enodeName";
private String topic = "SnodeTopic"; private String topic = "snodeTopic";
private String group = "SnodeGroup"; private String group = "snodeGroup";
@Before @Before
public void init() { public void init() {
snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient);
enodeService = new EnodeServiceImpl(snodeController); enodeService = new EnodeServiceImpl(snodeController);
} }
@Test @Test
public void sendMessageTest() throws Exception { public void sendMessageTest() throws Exception {
snodeController.setNnodeService(nnodeService); when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
snodeController.setRemotingClient(remotingClient);
when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024");
CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>(); CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>();
doAnswer(new Answer() { doAnswer(new Answer() {
@Override @Override
...@@ -123,6 +124,15 @@ public class EnodeServiceImplTest extends SnodeTestBase { ...@@ -123,6 +124,15 @@ public class EnodeServiceImplTest extends SnodeTestBase {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
@Test
public void creatTopicTest() throws Exception {
when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
TopicConfig topicConfig = new TopicConfig(topic, 1, 1, 2);
RemotingCommand response = enodeService.creatTopic(enodeName, topicConfig);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private GetMessageResult createGetMessageResult() { private GetMessageResult createGetMessageResult() {
GetMessageResult getMessageResult = new GetMessageResult(); GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND); getMessageResult.setStatus(GetMessageStatus.FOUND);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NnodeServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
@Mock
private NettyRemotingClient remotingClient;
private NnodeService nnodeService;
@Before
public void init() {
snodeController.setRemotingClient(remotingClient);
nnodeService = new NnodeServiceImpl(snodeController);
}
@Test
public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList());
when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
try {
nnodeService.registerSnode(createSnodeConfig());
} catch (Exception ex) {
assertThat(ex).isNull();
}
}
private List createNnodeList() {
List<String> addresses = new ArrayList<>();
addresses.add("127.0.0.1:9876");
return addresses;
}
private SnodeConfig createSnodeConfig() {
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setClusterName("defaultCluster");
snodeConfig.setSnodeIP1("127.0.0.1:10911");
snodeConfig.setSnodeName("snode-a");
return snodeConfig;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册