提交 0ac1e8f0 编写于 作者: Z zhangxin10

1. 切换采集服务的底层实现,采用Netty4.0.3

上级 96b19356
......@@ -31,6 +31,11 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.33.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -3,109 +3,64 @@ package com.ai.cloud.skywalking.reciever;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.conf.ConfigInitializer;
import com.ai.cloud.skywalking.reciever.handler.CollectionServerDataHandler;
import com.ai.cloud.skywalking.reciever.persistance.PersistenceThreadLauncher;
import com.ai.cloud.skywalking.reciever.util.ByteArrayUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
public class CollectionServer {
static Logger logger = LogManager.getLogger(CollectionServer.class);
private Selector selector;
static Map<Integer, ByteBuffer> byteBuffers = new LinkedHashMap<Integer, ByteBuffer>();
public CollectionServer() {
byteBuffers.put(512, ByteBuffer.allocate(512));
byteBuffers.put(128, ByteBuffer.allocate(128));
byteBuffers.put(32, ByteBuffer.allocate(32));
byteBuffers.put(8, ByteBuffer.allocate(8));
byteBuffers.put(2, ByteBuffer.allocate(2));
byteBuffers.put(1, ByteBuffer.allocate(1));
}
public void doCollect() throws IOException {
ServerSocketChannel serverSocketChannel = initServerSocketChannel();
ByteBuffer contextLengthBuffer = ByteBuffer.allocate(4);
while (selector.select() > 0) {
Iterator<?> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
beginToRead(serverSocketChannel, key);
if (key.isReadable()) {
ByteChannel sc = (SocketChannel) key.channel();
try {
sc.read(contextLengthBuffer);
int length = ByteArrayUtil.byteArrayToInt(contextLengthBuffer.array(), 0);
if (length > 0) {
readDataFromSocketChannel(length, byteBuffers, sc);
public void doCollect() throws IOException, InterruptedException {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
@Override
public void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new CollectionServerDataHandler());
}
} catch (IOException e) {
logger.error("The remote client disconnect service", e);
sc.close();
} finally {
contextLengthBuffer.clear();
}
}
}
}
}
});
public static void readDataFromSocketChannel(int length, Map<Integer, ByteBuffer> byteBuffers, ByteChannel byteChannel) {
int tmp = length;
byte[] tmpBytes = new byte[length];
int tmpLength = 0;
for (Map.Entry<Integer, ByteBuffer> entry : byteBuffers.entrySet()) {
int j = tmp / entry.getKey();
if (j == 0) {
continue;
}
for (int k = 0; k < j; k++) {
try {
byteChannel.read(entry.getValue());
System.arraycopy(entry.getValue().array(), 0, tmpBytes, tmpLength, entry.getValue().array().length);
} catch (IOException e) {
logger.error("Read data From socket channel", e);
} finally {
entry.getValue().clear();
}
tmpLength += entry.getKey();
}
tmp = tmp % entry.getKey();
ChannelFuture f = b.bind(Config.Server.PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
DataBufferThreadContainer.getDataBufferThread().doCarry(tmpBytes);
}
private void beginToRead(ServerSocketChannel serverSocketChannel, SelectionKey key) throws IOException {
if (key.isAcceptable()) {
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
}
private ServerSocketChannel initServerSocketChannel() throws IOException {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", Config.Server.PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
logger.info("The service is listening on port {}", Config.Server.PORT);
return serverSocketChannel;
}
public static void main(String[] args) throws IOException, IllegalAccessException {
public static void main(String[] args) throws IOException, IllegalAccessException, InterruptedException {
logger.info("To initialize the collect server configuration parameters....");
initializeParam();
logger.info("To launch register persistence thread....");
......
package com.ai.cloud.skywalking.reciever.handler;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class CollectionServerDataHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
DataBufferThreadContainer.getDataBufferThread().doCarry(msg.toString().getBytes());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册