# RPC Transporter & Codec & Serialization ![img](https://github.com/javahongxi/static/blob/master/dubbo_01.jpg) 上面这张图虽然是Dubbo的remoting设计,但这个设计具有一定的通用参考性。 下面解读下RocketMQ的remoting的Codec和线程模型。 RocketMQ的通信协议的格式如下: 4 byte | 1+3 byte | 变长 | 变长 ------ | ------|------ |------ 消息长度 | 序列化类型+消息头长度 | 消息头数据 | 消息体数据 (变长是相对定长的说法,即长度不固定) | Header字段 | 类型 | Request说明 | Response说明 | | :--------- | :----------- | :----------------------------------------------------------- | :--------------------------------------- | | code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 | | language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 | | version | int | 请求方程序的版本 | 应答方程序的版本 | | opaque | int | 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 | | flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志 | | remark | String | 传输自定义文本信息 | 传输自定义文本信息 | | extFields | HashMap | 请求自定义扩展信息 | 响应自定义扩展信息 | > Netty提供了 LineBasedFrameDecoder,DelimiterBasedFrameDecoder,LengthFieldBasedFrameDecoder 等常用半包解码器。如何区分一个整包消息,通常有如下4种做法:
1.固定长度,例如每120个字节代表一个整包消息,不足的前面补零。解码器在处理这类定长消息时比较简单,每次读到指定长度的字节后再进行解码
2.通过回车换行符区分消息,例如FTP协议。这类区分消息的方式多用于文本协议
3.通过分隔符区分整包消息
4.通过指定长度来标识整包消息 RocketMQ的编解码类是 `NettyEncoder` 和 `NettyDecoder`,其中 `NettyDecoder` 继承于LengthFieldBasedFrameDecoder,即采用了如上第4种做法。 我们先看 `NettyEncoder` ```java public class NettyEncoder extends MessageToByteEncoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } } ``` RocketMQ的请求/返回字段定义以及编解码逻辑都在`RemotingCommand`里,我们继续看encodeHeader() ```java public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size (1+3) int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; // 消息长度 + length - bodyLength ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; } // 第一位放序列化类型,后三位放 header length public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; } ``` header的encode细节在`RocketMQSerializable`里 ```java public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap extFields byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } int totalLen = calTotalLen(remarkLen, extLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); } public static byte[] mapSerialize(HashMap map) { // keySize+key+valSize+val if (null == map || map.isEmpty()) return null; int totalLength = 0; int kvLength; Iterator> it = map.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); if (entry.getKey() != null && entry.getValue() != null) { kvLength = // keySize + Key 2 + entry.getKey().getBytes(CHARSET_UTF8).length // valSize + val + 4 + entry.getValue().getBytes(CHARSET_UTF8).length; totalLength += kvLength; } } ByteBuffer content = ByteBuffer.allocate(totalLength); byte[] key; byte[] val; it = map.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); if (entry.getKey() != null && entry.getValue() != null) { key = entry.getKey().getBytes(CHARSET_UTF8); val = entry.getValue().getBytes(CHARSET_UTF8); content.putShort((short) key.length); content.put(key); content.putInt(val.length); content.put(val); } } return content.array(); } private static int calTotalLen(int remark, int ext) { // int code(~32767) int length = 2 // LanguageCode language + 1 // int version(~32767) + 2 // int opaque + 4 // int flag + 4 // String remark + 4 + remark // HashMap extFields + 4 + ext; return length; } ``` 接下来看 `NettyDecoder` ```java public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; } } ``` 重点理解构造方法里的如下4个参数: - lengthFieldOffset = 0 - lengthFieldLength = 4 - lengthAdjustment = 0 - initialBytesToStrip = 4 ```java /** * A decoder that splits the received {@link ByteBuf}s dynamically by the * value of the length field in the message. It is particularly useful when you * decode a binary message which has an integer header field that represents the * length of the message body or the whole message. * *

2 bytes length field at offset 0, strip header

* * Because we can get the length of the content by calling * {@link ByteBuf#readableBytes()}, you might want to strip the length * field by specifying initialBytesToStrip. In this example, we * specified 2, that is same with the length of the length field, to * strip the first two bytes. *
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2
 * lengthAdjustment    = 0
 * initialBytesToStrip = 2 (= the length of the Length field)
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+
 * 
*/ ``` 继续看 `RemotingCommand` ```java public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } public static int getHeaderLength(int length) { return length & 0xFFFFFF; } private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; } public static SerializeType getProtocolType(int source) { return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); } ``` header的decode细节在`RocketMQSerializable`里 ```java public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; } ``` ### RocketMQ remoting 线程模型 "1 + N + M1 + M2"的Reactor多线程模型 (N,M1,M2可配) | 线程数 | 线程名 | 线程具体说明 | | :----- | :----------------------------- | :---------------------- | | 1 | NettyBoss_%d | Reactor 主线程 | | N | NettyServerEPOLLSelector_%d_%d | Reactor 线程池 | | M1 | NettyServerCodecThread_%d | Worker线程池 | | M2 | RemotingExecutorThread_%d | 业务processor处理线程池 | ![img](https://github.com/javahongxi/static/blob/master/rocketmq_01.png) `NettyRemotingServer` ```java ServerBootstrap childHandler = // EventLoopGroup: bossGroup, workerGroup this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池) .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), // NettyServerHandler里有业务处理线程池 new NettyServerHandler() ); } }); ``` - [《Netty权威指南》](http://e.jd.com/30186249.html) `e.jd.com` - [开发者如何玩转 RocketMQ?附最全源码解读 【Remoting篇】](https://blog.csdn.net/javahongxi/article/details/86628470)