提交 114b6ae0 编写于 作者: Y yukon

Minor polish

上级 0b88e66f
...@@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer; ...@@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public interface ByteBufferWrapper { public interface ByteBufferWrapper {
void writeByte(int index, byte data);
void writeByte(byte data); void writeByte(byte data);
byte readByte(); void writeByte(int index, byte data);
void writeInt(int data);
void writeBytes(byte[] data); void writeBytes(byte[] data);
void writeBytes(ByteBuffer data); void writeBytes(ByteBuffer data);
int readableBytes(); void writeInt(int data);
int readInt(); void writeShort(short value);
void writeLong(long id);
byte readByte();
void readBytes(byte[] dst); void readBytes(byte[] dst);
void readBytes(ByteBuffer dst); void readBytes(ByteBuffer dst);
int readerIndex(); short readShort();
void setReaderIndex(int readerIndex);
void writeLong(long id); int readInt();
long readLong(); long readLong();
void ensureCapacity(int capacity); int readableBytes();
short readShort(); int readerIndex();
void writeShort(short value); void setReaderIndex(int readerIndex);
void ensureCapacity(int capacity);
} }
...@@ -18,39 +18,27 @@ ...@@ -18,39 +18,27 @@
package org.apache.rocketmq.remoting.impl.buffer; package org.apache.rocketmq.remoting.impl.buffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
public class NettyByteBufferWrapper implements ByteBufferWrapper { public class NettyByteBufferWrapper implements ByteBufferWrapper {
private final ByteBuf buffer; private final ByteBuf buffer;
private final Channel channel;
public NettyByteBufferWrapper(ByteBuf buffer) { public NettyByteBufferWrapper(ByteBuf buffer) {
this(buffer, null);
}
public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) {
this.channel = channel;
this.buffer = buffer; this.buffer = buffer;
} }
public void writeByte(int index, byte data) { @Override
buffer.writeByte(data);
}
public void writeByte(byte data) { public void writeByte(byte data) {
buffer.writeByte(data); buffer.writeByte(data);
} }
public byte readByte() { @Override
return buffer.readByte(); public void writeByte(int index, byte data) {
} buffer.writeByte(data);
public void writeInt(int data) {
buffer.writeInt(data);
} }
@Override
public void writeBytes(byte[] data) { public void writeBytes(byte[] data) {
buffer.writeBytes(data); buffer.writeBytes(data);
} }
...@@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { ...@@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper {
buffer.writeBytes(data); buffer.writeBytes(data);
} }
public int readableBytes() { @Override
return buffer.readableBytes(); public void writeShort(final short value) {
buffer.writeShort(value);
} }
public int readInt() { @Override
return buffer.readInt(); public void writeInt(int data) {
buffer.writeInt(data);
} }
public void readBytes(byte[] dst) { @Override
buffer.readBytes(dst); public void writeLong(long value) {
buffer.writeLong(value);
}
@Override
public byte readByte() {
return buffer.readByte();
} }
@Override @Override
...@@ -77,17 +73,19 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { ...@@ -77,17 +73,19 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper {
buffer.readBytes(dst); buffer.readBytes(dst);
} }
public int readerIndex() { @Override
return buffer.readerIndex(); public void readBytes(byte[] dst) {
buffer.readBytes(dst);
} }
public void setReaderIndex(int index) { @Override
buffer.setIndex(index, buffer.writerIndex()); public short readShort() {
return buffer.readShort();
} }
@Override @Override
public void writeLong(long value) { public int readInt() {
buffer.writeLong(value); return buffer.readInt();
} }
@Override @Override
...@@ -96,18 +94,23 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { ...@@ -96,18 +94,23 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper {
} }
@Override @Override
public void ensureCapacity(int capacity) { public int readableBytes() {
buffer.capacity(capacity); return buffer.readableBytes();
} }
@Override @Override
public short readShort() { public int readerIndex() {
return buffer.readShort(); return buffer.readerIndex();
} }
@Override @Override
public void writeShort(final short value) { public void setReaderIndex(int index) {
buffer.writeShort(value); buffer.setIndex(index, buffer.writerIndex());
}
@Override
public void ensureCapacity(int capacity) {
buffer.capacity(capacity);
} }
} }
......
...@@ -77,7 +77,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { ...@@ -77,7 +77,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
private final Semaphore semaphoreAsync; private final Semaphore semaphoreAsync;
private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256); private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>(); private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>();
private final AtomicLong count = new AtomicLong(0); private final AtomicLong responseCounter = new AtomicLong(0);
private final RemotingCommandFactory remotingCommandFactory; private final RemotingCommandFactory remotingCommandFactory;
private final String remotingInstanceId = UIDGenerator.instance().createUID(); private final String remotingInstanceId = UIDGenerator.instance().createUID();
...@@ -93,8 +93,13 @@ public abstract class NettyRemotingAbstract implements RemotingService { ...@@ -93,8 +93,13 @@ public abstract class NettyRemotingAbstract implements RemotingService {
NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60, this.publicExecutor = ThreadUtils.newThreadPoolExecutor(
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), "PublicExecutor", true); clientConfig.getClientAsyncCallbackExecutorThreads(),
clientConfig.getClientAsyncCallbackExecutorThreads(),
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000),
"PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
} }
...@@ -237,9 +242,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { ...@@ -237,9 +242,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
ackTables.remove(cmd.requestID()); ackTables.remove(cmd.requestID());
if (count.incrementAndGet() % 5000 == 0) if (responseCounter.incrementAndGet() % 5000 == 0) {
LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(),
ackTables.size()); ackTables.size());
}
if (responseResult.getAsyncHandler() != null) { if (responseResult.getAsyncHandler() != null) {
boolean sameThread = false; boolean sameThread = false;
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getCallbackExecutor();
......
...@@ -44,7 +44,7 @@ public class Decoder extends ByteToMessageDecoder { ...@@ -44,7 +44,7 @@ public class Decoder extends ByteToMessageDecoder {
return; return;
} }
NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, ctx.channel()); NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in);
Object msg = this.decode(ctx, wrapper); Object msg = this.decode(ctx, wrapper);
if (msg != null) { if (msg != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册