From a5a4cceb7aa9070181c3df531d11e97775f4ccca Mon Sep 17 00:00:00 2001 From: wusheng Date: Mon, 25 Jan 2016 17:24:53 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E5=8F=91=E9=80=81=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cloud/skywalking/sender/DataSender.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java index 3e2d454dc1..c56422b490 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java @@ -1,25 +1,31 @@ package com.ai.cloud.skywalking.sender; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.logging.Logger; + import com.ai.cloud.io.netty.bootstrap.Bootstrap; -import com.ai.cloud.io.netty.channel.*; +import com.ai.cloud.io.netty.channel.Channel; +import com.ai.cloud.io.netty.channel.ChannelHandlerContext; +import com.ai.cloud.io.netty.channel.ChannelInboundHandlerAdapter; +import com.ai.cloud.io.netty.channel.ChannelInitializer; +import com.ai.cloud.io.netty.channel.ChannelOption; +import com.ai.cloud.io.netty.channel.ChannelPipeline; +import com.ai.cloud.io.netty.channel.EventLoopGroup; import com.ai.cloud.io.netty.channel.nio.NioEventLoopGroup; -import com.ai.cloud.io.netty.channel.socket.ServerSocketChannel; +import com.ai.cloud.io.netty.channel.socket.SocketChannel; import com.ai.cloud.io.netty.channel.socket.nio.NioSocketChannel; import com.ai.cloud.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import com.ai.cloud.io.netty.handler.codec.LengthFieldPrepender; import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayDecoder; import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayEncoder; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.logging.Logger; - public class DataSender extends ChannelInboundHandlerAdapter implements IDataSender { private static Logger logger = Logger.getLogger(DataSender.class.getName()); private EventLoopGroup group; private SenderStatus status = SenderStatus.FAILED; private InetSocketAddress socketAddress; - private ChannelFuture channelFuture; + private Channel channel; public DataSender(String ip, int port) throws IOException { this(new InetSocketAddress(ip, port)); @@ -34,17 +40,23 @@ public class DataSender extends ChannelInboundHandlerAdapter implements IDataSen bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) - .handler(new ChannelInitializer() { + .handler(new ChannelInitializer() { @Override - protected void initChannel(ServerSocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); p.addLast("frameEncoder", new LengthFieldPrepender(4)); p.addLast("decoder", new ByteArrayDecoder()); p.addLast("encoder", new ByteArrayEncoder()); + p.addLast(new ChannelInboundHandlerAdapter(){ + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + channel = ctx.channel(); + } + }); } }); - channelFuture = bootstrap.connect(address).sync(); + bootstrap.connect(address).sync(); } catch (Exception e) { status = SenderStatus.FAILED; } @@ -59,8 +71,8 @@ public class DataSender extends ChannelInboundHandlerAdapter implements IDataSen @Override public boolean send(String data) { try { - if (channelFuture != null) { - channelFuture.channel().writeAndFlush(data); + if (channel != null && channel.isActive()) { + channel.writeAndFlush(data.getBytes()); return true; } } catch (Exception e) { -- GitLab