提交 4b446d77 编写于 作者: S shroman 提交者: vongosling

ROCKETMQ-2 [ROCKETMQ-2] Closing the channel closes apache/incubator-rocketmq#1

上级 63803056
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.broker;
import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
/**
* @author shtykh_roman
*/
public class BrokerControllerTest {
private static final int RESTART_NUM = 3;
/**
* Tests if the controller can be properly stopped and started.
*
* @throws Exception If fails.
*/
@Test
public void testRestart() throws Exception {
for (int i = 0; i < RESTART_NUM; i++) {
BrokerController brokerController = new BrokerController(//
new BrokerConfig(), //
new NettyServerConfig(), //
new NettyClientConfig(), //
new MessageStoreConfig());
boolean initResult = brokerController.initialize();
System.out.println("initialize " + initResult);
brokerController.start();
brokerController.shutdown();
}
}
}
......@@ -116,7 +116,7 @@ public class HAService {
// this.groupTransferService.notifyTransferSome();
// }
public void start() {
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
......@@ -181,20 +181,26 @@ public class HAService {
}
public void beginAccept() {
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
@Override
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
try {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
log.error("beginAccept exception", e);
serverSocketChannel.close();
}
catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册