提交 a5a4cceb 编写于 作者: wu-sheng's avatar wu-sheng

1.修改发送代码。

上级 b29bc628
package com.ai.cloud.skywalking.sender;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.logging.Logger;
import com.ai.cloud.io.netty.bootstrap.Bootstrap;
import com.ai.cloud.io.netty.channel.*;
import com.ai.cloud.io.netty.channel.Channel;
import com.ai.cloud.io.netty.channel.ChannelHandlerContext;
import com.ai.cloud.io.netty.channel.ChannelInboundHandlerAdapter;
import com.ai.cloud.io.netty.channel.ChannelInitializer;
import com.ai.cloud.io.netty.channel.ChannelOption;
import com.ai.cloud.io.netty.channel.ChannelPipeline;
import com.ai.cloud.io.netty.channel.EventLoopGroup;
import com.ai.cloud.io.netty.channel.nio.NioEventLoopGroup;
import com.ai.cloud.io.netty.channel.socket.ServerSocketChannel;
import com.ai.cloud.io.netty.channel.socket.SocketChannel;
import com.ai.cloud.io.netty.channel.socket.nio.NioSocketChannel;
import com.ai.cloud.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import com.ai.cloud.io.netty.handler.codec.LengthFieldPrepender;
import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayDecoder;
import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayEncoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.logging.Logger;
public class DataSender extends ChannelInboundHandlerAdapter implements IDataSender {
private static Logger logger = Logger.getLogger(DataSender.class.getName());
private EventLoopGroup group;
private SenderStatus status = SenderStatus.FAILED;
private InetSocketAddress socketAddress;
private ChannelFuture channelFuture;
private Channel channel;
public DataSender(String ip, int port) throws IOException {
this(new InetSocketAddress(ip, port));
......@@ -34,17 +40,23 @@ public class DataSender extends ChannelInboundHandlerAdapter implements IDataSen
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<ServerSocketChannel>() {
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("decoder", new ByteArrayDecoder());
p.addLast("encoder", new ByteArrayEncoder());
p.addLast(new ChannelInboundHandlerAdapter(){
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channel = ctx.channel();
}
});
}
});
channelFuture = bootstrap.connect(address).sync();
bootstrap.connect(address).sync();
} catch (Exception e) {
status = SenderStatus.FAILED;
}
......@@ -59,8 +71,8 @@ public class DataSender extends ChannelInboundHandlerAdapter implements IDataSen
@Override
public boolean send(String data) {
try {
if (channelFuture != null) {
channelFuture.channel().writeAndFlush(data);
if (channel != null && channel.isActive()) {
channel.writeAndFlush(data.getBytes());
return true;
}
} catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册