提交 fed09763 编写于 作者: S shtykh_roman 提交者: Willem Jiang

[ROCKETMQ-13] Wrong log level for AcceptSocketService termination.

Additionally, added code comments and did a cleanup.

JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-13
上级 626990cf
...@@ -46,7 +46,7 @@ public class HAService { ...@@ -46,7 +46,7 @@ public class HAService {
private final AtomicInteger connectionCount = new AtomicInteger(0); private final AtomicInteger connectionCount = new AtomicInteger(0);
private final List<HAConnection> connectionList = new LinkedList<HAConnection>(); private final List<HAConnection> connectionList = new LinkedList<>();
private final AcceptSocketService acceptSocketService; private final AcceptSocketService acceptSocketService;
...@@ -170,17 +170,22 @@ public class HAService { ...@@ -170,17 +170,22 @@ public class HAService {
return push2SlaveMaxOffset; return push2SlaveMaxOffset;
} }
/**
* Listens to slave connections to create {@link HAConnection}.
*/
class AcceptSocketService extends ServiceThread { class AcceptSocketService extends ServiceThread {
private ServerSocketChannel serverSocketChannel; private ServerSocketChannel serverSocketChannel;
private Selector selector; private Selector selector;
private final SocketAddress socketAddressListen; private final SocketAddress socketAddressListen;
public AcceptSocketService(final int port) { public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port); this.socketAddressListen = new InetSocketAddress(port);
} }
/**
* Starts listening to slave connections.
* @throws Exception If fails.
*/
public void beginAccept() throws Exception { public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open(); this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector(); this.selector = RemotingUtil.openSelector();
...@@ -190,6 +195,7 @@ public class HAService { ...@@ -190,6 +195,7 @@ public class HAService {
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
} }
/** {@inheritDoc} */
@Override @Override
public void shutdown(final boolean interrupt) { public void shutdown(final boolean interrupt) {
super.shutdown(interrupt); super.shutdown(interrupt);
...@@ -202,6 +208,7 @@ public class HAService { ...@@ -202,6 +208,7 @@ public class HAService {
} }
} }
/** {@inheritDoc} */
@Override @Override
public void run() { public void run() {
log.info(this.getServiceName() + " service started"); log.info(this.getServiceName() + " service started");
...@@ -210,10 +217,12 @@ public class HAService { ...@@ -210,10 +217,12 @@ public class HAService {
try { try {
this.selector.select(1000); this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys(); Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) { if (selected != null) {
for (SelectionKey k : selected) { for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) { if (sc != null) {
HAService.log.info("HAService receive new connection, " HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress()); + sc.socket().getRemoteSocketAddress());
...@@ -234,16 +243,15 @@ public class HAService { ...@@ -234,16 +243,15 @@ public class HAService {
selected.clear(); selected.clear();
} }
} catch (Exception e) { } catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e); log.error(this.getServiceName() + " service has exception.", e);
} }
} }
log.error(this.getServiceName() + " service end"); log.info(this.getServiceName() + " service end");
} }
/** {@inheritDoc} */
@Override @Override
public String getServiceName() { public String getServiceName() {
return AcceptSocketService.class.getSimpleName(); return AcceptSocketService.class.getSimpleName();
...@@ -256,8 +264,8 @@ public class HAService { ...@@ -256,8 +264,8 @@ public class HAService {
class GroupTransferService extends ServiceThread { class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();
public void putRequest(final GroupCommitRequest request) { public void putRequest(final GroupCommitRequest request) {
...@@ -333,7 +341,7 @@ public class HAService { ...@@ -333,7 +341,7 @@ public class HAService {
class HAClient extends ServiceThread { class HAClient extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
private final AtomicReference<String> masterAddress = new AtomicReference<String>(); private final AtomicReference<String> masterAddress = new AtomicReference<>();
private final ByteBuffer reportOffset = ByteBuffer.allocate(8); private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
private SocketChannel socketChannel; private SocketChannel socketChannel;
private Selector selector; private Selector selector;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册