diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java index 075252cb3a63a35b80b5fa951f8a4fde1eff4276..2cf695c588f0a61a6ae275ea2946f110ee7ed7a8 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java @@ -46,7 +46,7 @@ public class HAService { private final AtomicInteger connectionCount = new AtomicInteger(0); - private final List connectionList = new LinkedList(); + private final List connectionList = new LinkedList<>(); private final AcceptSocketService acceptSocketService; @@ -170,17 +170,22 @@ public class HAService { return push2SlaveMaxOffset; } + /** + * Listens to slave connections to create {@link HAConnection}. + */ class AcceptSocketService extends ServiceThread { private ServerSocketChannel serverSocketChannel; private Selector selector; private final SocketAddress socketAddressListen; - public AcceptSocketService(final int port) { this.socketAddressListen = new InetSocketAddress(port); } - + /** + * Starts listening to slave connections. + * @throws Exception If fails. + */ public void beginAccept() throws Exception { this.serverSocketChannel = ServerSocketChannel.open(); this.selector = RemotingUtil.openSelector(); @@ -190,6 +195,7 @@ public class HAService { this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } + /** {@inheritDoc} */ @Override public void shutdown(final boolean interrupt) { super.shutdown(interrupt); @@ -202,6 +208,7 @@ public class HAService { } } + /** {@inheritDoc} */ @Override public void run() { log.info(this.getServiceName() + " service started"); @@ -210,10 +217,12 @@ public class HAService { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); + if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); + if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); @@ -234,16 +243,15 @@ public class HAService { selected.clear(); } - } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } } - log.error(this.getServiceName() + " service end"); + log.info(this.getServiceName() + " service end"); } - + /** {@inheritDoc} */ @Override public String getServiceName() { return AcceptSocketService.class.getSimpleName(); @@ -256,8 +264,8 @@ public class HAService { class GroupTransferService extends ServiceThread { private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); - private volatile List requestsWrite = new ArrayList(); - private volatile List requestsRead = new ArrayList(); + private volatile List requestsWrite = new ArrayList<>(); + private volatile List requestsRead = new ArrayList<>(); public void putRequest(final GroupCommitRequest request) { @@ -333,7 +341,7 @@ public class HAService { class HAClient extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; - private final AtomicReference masterAddress = new AtomicReference(); + private final AtomicReference masterAddress = new AtomicReference<>(); private final ByteBuffer reportOffset = ByteBuffer.allocate(8); private SocketChannel socketChannel; private Selector selector;