From 114b6ae083ed5338e2b59a501ec08ec23c3e2ece Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 20 Sep 2017 17:06:04 +0800 Subject: [PATCH] Minor polish --- .../api/buffer/ByteBufferWrapper.java | 28 +++---- .../impl/buffer/NettyByteBufferWrapper.java | 73 ++++++++++--------- .../impl/netty/NettyRemotingAbstract.java | 16 ++-- .../remoting/impl/netty/handler/Decoder.java | 2 +- 4 files changed, 64 insertions(+), 55 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java index 7cae3ac6..7360c88c 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java @@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer; import java.nio.ByteBuffer; public interface ByteBufferWrapper { - void writeByte(int index, byte data); - void writeByte(byte data); - byte readByte(); - - void writeInt(int data); + void writeByte(int index, byte data); void writeBytes(byte[] data); void writeBytes(ByteBuffer data); - int readableBytes(); + void writeInt(int data); - int readInt(); + void writeShort(short value); + + void writeLong(long id); + + byte readByte(); void readBytes(byte[] dst); void readBytes(ByteBuffer dst); - int readerIndex(); - - void setReaderIndex(int readerIndex); + short readShort(); - void writeLong(long id); + int readInt(); long readLong(); - void ensureCapacity(int capacity); + int readableBytes(); - short readShort(); + int readerIndex(); - void writeShort(short value); + void setReaderIndex(int readerIndex); + + void ensureCapacity(int capacity); } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java index e17bcfdc..5a714521 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java @@ -18,39 +18,27 @@ package org.apache.rocketmq.remoting.impl.buffer; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import java.nio.ByteBuffer; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; public class NettyByteBufferWrapper implements ByteBufferWrapper { private final ByteBuf buffer; - private final Channel channel; public NettyByteBufferWrapper(ByteBuf buffer) { - this(buffer, null); - } - - public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) { - this.channel = channel; this.buffer = buffer; } - public void writeByte(int index, byte data) { - buffer.writeByte(data); - } - + @Override public void writeByte(byte data) { buffer.writeByte(data); } - public byte readByte() { - return buffer.readByte(); - } - - public void writeInt(int data) { - buffer.writeInt(data); + @Override + public void writeByte(int index, byte data) { + buffer.writeByte(data); } + @Override public void writeBytes(byte[] data) { buffer.writeBytes(data); } @@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { buffer.writeBytes(data); } - public int readableBytes() { - return buffer.readableBytes(); + @Override + public void writeShort(final short value) { + buffer.writeShort(value); } - public int readInt() { - return buffer.readInt(); + @Override + public void writeInt(int data) { + buffer.writeInt(data); } - public void readBytes(byte[] dst) { - buffer.readBytes(dst); + @Override + public void writeLong(long value) { + buffer.writeLong(value); + } + + @Override + public byte readByte() { + return buffer.readByte(); } @Override @@ -77,17 +73,19 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { buffer.readBytes(dst); } - public int readerIndex() { - return buffer.readerIndex(); + @Override + public void readBytes(byte[] dst) { + buffer.readBytes(dst); } - public void setReaderIndex(int index) { - buffer.setIndex(index, buffer.writerIndex()); + @Override + public short readShort() { + return buffer.readShort(); } @Override - public void writeLong(long value) { - buffer.writeLong(value); + public int readInt() { + return buffer.readInt(); } @Override @@ -96,18 +94,23 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { } @Override - public void ensureCapacity(int capacity) { - buffer.capacity(capacity); + public int readableBytes() { + return buffer.readableBytes(); } @Override - public short readShort() { - return buffer.readShort(); + public int readerIndex() { + return buffer.readerIndex(); } @Override - public void writeShort(final short value) { - buffer.writeShort(value); + public void setReaderIndex(int index) { + buffer.setIndex(index, buffer.writerIndex()); + } + + @Override + public void ensureCapacity(int capacity) { + buffer.capacity(capacity); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 1af62cb6..4c22e7ce 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -77,7 +77,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { private final Semaphore semaphoreAsync; private final Map ackTables = new ConcurrentHashMap(256); private final Map> processorTables = new ConcurrentHashMap>(); - private final AtomicLong count = new AtomicLong(0); + private final AtomicLong responseCounter = new AtomicLong(0); private final RemotingCommandFactory remotingCommandFactory; private final String remotingInstanceId = UIDGenerator.instance().createUID(); @@ -93,8 +93,13 @@ public abstract class NettyRemotingAbstract implements RemotingService { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); - this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60, - TimeUnit.SECONDS, new ArrayBlockingQueue(10000), "PublicExecutor", true); + this.publicExecutor = ThreadUtils.newThreadPoolExecutor( + clientConfig.getClientAsyncCallbackExecutorThreads(), + clientConfig.getClientAsyncCallbackExecutorThreads(), + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue(10000), + "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } @@ -237,9 +242,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { long time = System.currentTimeMillis(); ackTables.remove(cmd.requestID()); - if (count.incrementAndGet() % 5000 == 0) - LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), + if (responseCounter.incrementAndGet() % 5000 == 0) { + LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), ackTables.size()); + } if (responseResult.getAsyncHandler() != null) { boolean sameThread = false; ExecutorService executor = this.getCallbackExecutor(); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java index 87a09125..ec1d69db 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java @@ -44,7 +44,7 @@ public class Decoder extends ByteToMessageDecoder { return; } - NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, ctx.channel()); + NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in); Object msg = this.decode(ctx, wrapper); if (msg != null) { -- GitLab