diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java index 873bea70cf8ca2d450cec10dbba7127263b770db..57289f7a7ae1385bc00a24109b8daddc260c11fd 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java @@ -9,6 +9,8 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import com.alibaba.otter.canal.common.AbstractCanalLifeCycle; @@ -32,6 +34,7 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana private int port; private Channel serverChannel = null; private ServerBootstrap bootstrap = null; + private ChannelGroup childGroups = null; //socket channel container, used to close sockets explicitly. private static class SingletonHolder { @@ -40,6 +43,7 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana private CanalServerWithNetty(){ this.embeddedServer = CanalServerWithEmbedded.instance(); + this.childGroups = new DefaultChannelGroup(); } public static CanalServerWithNetty instance() { @@ -55,6 +59,19 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + /* + * enable keep-alive mechanism, handle abnormal network connection scenarios on OS level. + * the threshold parameters are depended on OS. + * e.g. On Linux: + * net.ipv4.tcp_keepalive_time = 300 + * net.ipv4.tcp_keepalive_probes = 2 + * net.ipv4.tcp_keepalive_intvl = 30 + */ + bootstrap.setOption("child.keepAlive", true); + /* + * optional parameter. + */ + bootstrap.setOption("child.tcpNoDelay", true); // 构造对应的pipeline bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @@ -62,7 +79,8 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipelines = Channels.pipeline(); pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder()); - pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler()); + //support to maintain child socket channel. + pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler(childGroups)); pipelines.addLast(ClientAuthenticationHandler.class.getName(), new ClientAuthenticationHandler(embeddedServer)); @@ -87,6 +105,11 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana this.serverChannel.close().awaitUninterruptibly(1000); } + //close sockets explicitly to reduce socket channel hung in complicated network environment. + if (this.childGroups != null) { + this.childGroups.close().awaitUninterruptibly(5000); + } + if (this.bootstrap != null) { this.bootstrap.releaseExternalResources(); } diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java index 0656e8aa5d3b4c46e84a05fa9d8b4ed532939600..73bcd5d256c190da08a6117df9f84df9c783d947 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java @@ -1,5 +1,7 @@ package com.alibaba.otter.canal.server.netty.handler; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelFuture; @@ -87,10 +89,11 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler { if (clientAuth.getNetWriteTimeout() > 0) { writeTimeout = clientAuth.getNetWriteTimeout(); } + //fix bug: soTimeout parameter's unit from connector is millseconds. IdleStateHandler idleStateHandler = new IdleStateHandler(NettyUtils.hashedWheelTimer, readTimeout, writeTimeout, - 0); + 0, TimeUnit.MILLISECONDS); ctx.getPipeline().addBefore(SessionHandler.class.getName(), IdleStateHandler.class.getName(), idleStateHandler); diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java index b65ff2a1aaf6ccf6c7a47ef2bd21bd97a8ece9df..438e508f33a9d2be7fb383575f8f71d89fb2e457 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java @@ -3,6 +3,7 @@ package com.alibaba.otter.canal.server.netty.handler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.group.ChannelGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,10 +19,21 @@ import com.alibaba.otter.canal.server.netty.NettyUtils; * @version 1.0.0 */ public class HandshakeInitializationHandler extends SimpleChannelHandler { + //support to maintain socket channel. + private ChannelGroup childGroups; + + public HandshakeInitializationHandler(ChannelGroup childGroups) { + this.childGroups = childGroups; + } private static final Logger logger = LoggerFactory.getLogger(HandshakeInitializationHandler.class); public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + //add new socket channel in channel container, used to manage sockets. + if(childGroups != null) { + childGroups.add(ctx.getChannel()); + } + byte[] body = Packet.newBuilder() .setType(CanalPacket.PacketType.HANDSHAKE) .setBody(Handshake.newBuilder().build().toByteString())