提交 371ec416 编写于 作者: 如梦技术's avatar 如梦技术 🐛

添加 ByteBufferAllocator,支持堆内存和堆外内存。

上级 e34683f6
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
/**
* ByteBufAllocator
*
* @author L.cm
*/
public enum ByteBufferAllocator {
/**
* 堆内存
*/
HEAP() {
@Override
public ByteBuffer allocate(int capacity) {
return ByteBuffer.allocate(capacity);
}
},
/**
* 直接内存
*/
DIRECT() {
@Override
public ByteBuffer allocate(int capacity) {
return ByteBuffer.allocateDirect(capacity);
}
};
public abstract ByteBuffer allocate(int capacity);
}
......@@ -38,47 +38,49 @@ public final class MqttEncoder {
* This is the main encoding method.
* It's only visible for testing.
*
* @param ctx ChannelContext
* @param message MQTT message to encode
* @param ctx ChannelContext
* @param message MQTT message to encode
* @param allocator ByteBuffer Allocator
* @return ByteBuf with encoded bytes
*/
public ByteBuffer doEncode(ChannelContext ctx, MqttMessage message) {
public ByteBuffer doEncode(ChannelContext ctx, MqttMessage message, ByteBufferAllocator allocator) {
switch (message.fixedHeader().messageType()) {
case CONNECT:
return encodeConnectMessage(ctx, (MqttConnectMessage) message);
return encodeConnectMessage(ctx, (MqttConnectMessage) message, allocator);
case CONNACK:
return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
return encodeConnAckMessage(ctx, (MqttConnAckMessage) message, allocator);
case PUBLISH:
return encodePublishMessage(ctx, (MqttPublishMessage) message);
return encodePublishMessage(ctx, (MqttPublishMessage) message, allocator);
case SUBSCRIBE:
return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message, allocator);
case UNSUBSCRIBE:
return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message, allocator);
case SUBACK:
return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
return encodeSubAckMessage(ctx, (MqttSubAckMessage) message, allocator);
case UNSUBACK:
if (message instanceof MqttUnsubAckMessage) {
return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message, allocator);
}
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message);
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message, allocator);
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
return encodePubReplyMessage(ctx, message);
return encodePubReplyMessage(ctx, message, allocator);
case DISCONNECT:
case AUTH:
return encodeReasonCodePlusPropertiesMessage(ctx, message);
return encodeReasonCodePlusPropertiesMessage(ctx, message, allocator);
case PINGREQ:
case PINGRESP:
return encodeMessageWithOnlySingleByteFixedHeader(message);
return encodeMessageWithOnlySingleByteFixedHeader(message, allocator);
default:
throw new IllegalArgumentException("Unknown message type: " + message.fixedHeader().messageType().value());
}
}
private static ByteBuffer encodeConnectMessage(
ChannelContext ctx, MqttConnectMessage message) {
private static ByteBuffer encodeConnectMessage(ChannelContext ctx,
MqttConnectMessage message,
ByteBufferAllocator allocator) {
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
......@@ -138,7 +140,7 @@ public final class MqttEncoder {
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
// 申请 ByteBuffer
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) protocolNameBytes.length);
......@@ -192,11 +194,12 @@ public final class MqttEncoder {
return flagByte;
}
private static ByteBuffer encodeConnAckMessage(
ChannelContext ctx, MqttConnAckMessage message) {
private static ByteBuffer encodeConnAckMessage(ChannelContext ctx,
MqttConnAckMessage message,
ByteBufferAllocator allocator) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
byte[] propertiesBytes = encodePropertiesIfNeeded(mqttVersion, message.variableHeader().properties());
ByteBuffer buf = ByteBuffer.allocate(4 + propertiesBytes.length);
ByteBuffer buf = allocator.allocate(4 + propertiesBytes.length);
buf.put((byte) getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, 2 + propertiesBytes.length);
buf.put((byte) (message.variableHeader().isSessionPresent() ? 0x01 : 0x00));
......@@ -205,8 +208,9 @@ public final class MqttEncoder {
return buf;
}
private static ByteBuffer encodeSubscribeMessage(
ChannelContext ctx, MqttSubscribeMessage message) {
private static ByteBuffer encodeSubscribeMessage(ChannelContext ctx,
MqttSubscribeMessage message,
ByteBufferAllocator allocator) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
byte[] propertiesBytes = encodePropertiesIfNeeded(mqttVersion,
message.idAndPropertiesVariableHeader().properties());
......@@ -228,7 +232,7 @@ public final class MqttEncoder {
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
......@@ -259,8 +263,9 @@ public final class MqttEncoder {
return buf;
}
private static ByteBuffer encodeUnsubscribeMessage(
ChannelContext ctx, MqttUnsubscribeMessage message) {
private static ByteBuffer encodeUnsubscribeMessage(ChannelContext ctx,
MqttUnsubscribeMessage message,
ByteBufferAllocator allocator) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
byte[] propertiesBytes = encodePropertiesIfNeeded(mqttVersion,
message.idAndPropertiesVariableHeader().properties());
......@@ -280,7 +285,7 @@ public final class MqttEncoder {
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
......@@ -299,8 +304,9 @@ public final class MqttEncoder {
return buf;
}
private static ByteBuffer encodeSubAckMessage(
ChannelContext ctx, MqttSubAckMessage message) {
private static ByteBuffer encodeSubAckMessage(ChannelContext ctx,
MqttSubAckMessage message,
ByteBufferAllocator allocator) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
byte[] propertiesBytes = encodePropertiesIfNeeded(mqttVersion,
message.idAndPropertiesVariableHeader().properties());
......@@ -308,7 +314,7 @@ public final class MqttEncoder {
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) message.variableHeader().messageId());
......@@ -319,8 +325,9 @@ public final class MqttEncoder {
return buf;
}
private static ByteBuffer encodeUnsubAckMessage(
ChannelContext ctx, MqttUnsubAckMessage message) {
private static ByteBuffer encodeUnsubAckMessage(ChannelContext ctx,
MqttUnsubAckMessage message,
ByteBufferAllocator allocator) {
if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
byte[] propertiesBytes = encodePropertiesIfNeeded(mqttVersion, message.idAndPropertiesVariableHeader().properties());
......@@ -330,7 +337,7 @@ public final class MqttEncoder {
int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) message.variableHeader().messageId());
......@@ -343,12 +350,13 @@ public final class MqttEncoder {
}
return buf;
} else {
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message);
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message, allocator);
}
}
private static ByteBuffer encodePublishMessage(
ChannelContext ctx, MqttPublishMessage message) {
private static ByteBuffer encodePublishMessage(ChannelContext ctx,
MqttPublishMessage message,
ByteBufferAllocator allocator) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
......@@ -366,7 +374,7 @@ public final class MqttEncoder {
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) topicNameBytes.length);
......@@ -380,7 +388,8 @@ public final class MqttEncoder {
}
private static ByteBuffer encodePubReplyMessage(ChannelContext ctx,
MqttMessage message) {
MqttMessage message,
ByteBufferAllocator allocator) {
if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPubReplyMessageVariableHeader variableHeader =
......@@ -402,7 +411,7 @@ public final class MqttEncoder {
}
final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.putShort((short) variableHeader.messageId());
......@@ -412,24 +421,27 @@ public final class MqttEncoder {
buf.put(propertiesBytes);
return buf;
} else {
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message);
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message, allocator);
}
}
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(MqttMessage message) {
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(MqttMessage message,
ByteBufferAllocator allocator) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int variableHeaderBufferSize = 2; // variable part only has a message id
// variable part only has a message id
int variableHeaderBufferSize = 2;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.putShort((short) variableHeader.messageId());
return buf;
}
private static ByteBuffer encodeReasonCodePlusPropertiesMessage(
ChannelContext ctx, MqttMessage message) {
private static ByteBuffer encodeReasonCodePlusPropertiesMessage(ChannelContext ctx,
MqttMessage message,
ByteBufferAllocator allocator) {
if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
......@@ -451,7 +463,7 @@ public final class MqttEncoder {
variableHeaderBufferSize = 0;
}
final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
ByteBuffer buf = allocator.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
if (includeReasonCode) {
......@@ -460,13 +472,14 @@ public final class MqttEncoder {
buf.put(propertiesBytes);
return buf;
} else {
return encodeMessageWithOnlySingleByteFixedHeader(message);
return encodeMessageWithOnlySingleByteFixedHeader(message, allocator);
}
}
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeader(MqttMessage message) {
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeader(MqttMessage message,
ByteBufferAllocator allocator) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
ByteBuffer buf = ByteBuffer.allocate(2);
ByteBuffer buf = allocator.allocate(2);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
buf.put((byte) 0);
return buf;
......
......@@ -20,7 +20,7 @@ import net.dreamlu.iot.mqtt.codec.*;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import java.nio.ByteBuffer;
......@@ -33,11 +33,13 @@ import java.nio.ByteBuffer;
public class MqttClientAioHandler implements ClientAioHandler {
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final ByteBufferAllocator allocator;
private final MqttClientProcessor processor;
public MqttClientAioHandler(MqttClientProcessor processor) {
public MqttClientAioHandler(ByteBufferAllocator bufferAllocator, MqttClientProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.allocator = bufferAllocator;
this.processor = processor;
}
......@@ -47,13 +49,13 @@ public class MqttClientAioHandler implements ClientAioHandler {
}
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
return mqttDecoder.decode(channelContext, buffer, limit, position, readableLength);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return mqttEncoder.doEncode(channelContext, (MqttMessage) packet);
return mqttEncoder.doEncode(channelContext, (MqttMessage) packet, allocator);
}
@Override
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import org.tio.client.ClientChannelContext;
......@@ -99,6 +100,10 @@ public final class MqttClientCreator {
* mqtt5 properties
*/
private MqttProperties properties;
/**
* ByteBuffer Allocator
*/
private ByteBufferAllocator bufferAllocator = ByteBufferAllocator.HEAP;
protected String getIp() {
return ip;
......@@ -156,6 +161,10 @@ public final class MqttClientCreator {
return properties;
}
public ByteBufferAllocator getBufferAllocator() {
return bufferAllocator;
}
public MqttClientCreator ip(String ip) {
this.ip = ip;
return this;
......@@ -232,6 +241,11 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator bufferAllocator(ByteBufferAllocator allocator) {
this.bufferAllocator = allocator;
return this;
}
public MqttClient connect() throws Exception {
// 1. 生成 默认的 clientId
String clientId = getClientId();
......@@ -244,13 +258,13 @@ public final class MqttClientCreator {
CountDownLatch connLatch = new CountDownLatch(1);
MqttClientProcessor processor = new DefaultMqttClientProcessor(subManage, connLatch);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(Objects.requireNonNull(processor));
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this.bufferAllocator, Objects.requireNonNull(processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this);
// 3. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
if (reInterval != null && reInterval > 0) {
reconnConf = new ReconnConf(reInterval);
if (this.reInterval != null && this.reInterval > 0) {
reconnConf = new ReconnConf(this.reInterval);
} else {
reconnConf = new ReconnConf();
}
......
......@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.AcceptCompletionHandler;
import org.tio.server.intf.ServerAioHandler;
......@@ -37,11 +37,13 @@ public class MqttServerAioHandler implements ServerAioHandler {
private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final ByteBufferAllocator allocator;
private final MqttServerProcessor processor;
public MqttServerAioHandler(MqttServerProcessor processor) {
public MqttServerAioHandler(ByteBufferAllocator bufferAllocator, MqttServerProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.allocator = bufferAllocator;
this.processor = processor;
}
......@@ -55,10 +57,10 @@ public class MqttServerAioHandler implements ServerAioHandler {
* @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)
* @param context ChannelContext
* @return Packet
* @throws AioDecodeException AioDecodeException
* @throws TioDecodeException TioDecodeException
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws AioDecodeException {
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException {
return mqttDecoder.decode(context, buffer, limit, position, readableLength);
}
......@@ -72,7 +74,7 @@ public class MqttServerAioHandler implements ServerAioHandler {
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext context) {
return mqttEncoder.doEncode(context, (MqttMessage) packet);
return mqttEncoder.doEncode(context, (MqttMessage) packet, allocator);
}
/**
......
......@@ -28,8 +28,9 @@ public class MqttServerTest {
public static void main(String[] args) throws IOException {
int socketPort = 1883;
MqttServerProcessor brokerHandler = new MqttBrokerProcessorImpl();
ByteBufferAllocator bufferAllocator = ByteBufferAllocator.HEAP;
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(brokerHandler);
ServerAioHandler handler = new MqttServerAioHandler(bufferAllocator, brokerHandler);
// 监听
ServerAioListener listener = new MqttServerAioListener();
// 配置
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册