extFields
+ 4 + ext;
return length;
}
```
接下来看 `NettyDecoder`
```java
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
```
重点理解构造方法里的如下4个参数:
- lengthFieldOffset = 0
- lengthFieldLength = 4
- lengthAdjustment = 0
- initialBytesToStrip = 4
```java
/**
* A decoder that splits the received {@link ByteBuf}s dynamically by the
* value of the length field in the message. It is particularly useful when you
* decode a binary message which has an integer header field that represents the
* length of the message body or the whole message.
*
* 2 bytes length field at offset 0, strip header
*
* Because we can get the length of the content by calling
* {@link ByteBuf#readableBytes()}, you might want to strip the length
* field by specifying initialBytesToStrip. In this example, we
* specified 2, that is same with the length of the length field, to
* strip the first two bytes.
*
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = 0
* initialBytesToStrip = 2 (= the length of the Length field)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
*
*/
```
继续看 `RemotingCommand`
```java
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
```
header的decode细节在`RocketMQSerializable`里
```java
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
```
### RocketMQ remoting 线程模型
"1 + N + M1 + M2"的Reactor多线程模型 (N,M1,M2可配)
| 线程数 | 线程名 | 线程具体说明 |
| :----- | :----------------------------- | :---------------------- |
| 1 | NettyBoss_%d | Reactor 主线程 |
| N | NettyServerEPOLLSelector_%d_%d | Reactor 线程池 |
| M1 | NettyServerCodecThread_%d | Worker线程池 |
| M2 | RemotingExecutorThread_%d | 业务processor处理线程池 |
![img](https://github.com/javahongxi/static/blob/master/rocketmq_01.png)
`NettyRemotingServer`
```java
ServerBootstrap childHandler =
// EventLoopGroup: bossGroup, workerGroup
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池)
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
// NettyServerHandler里有业务处理线程池
new NettyServerHandler()
);
}
});
```
- [《Netty权威指南》](http://e.jd.com/30186249.html) `e.jd.com`
- [开发者如何玩转 RocketMQ?附最全源码解读 【Remoting篇】](https://blog.csdn.net/javahongxi/article/details/86628470)