Apache License
Version 2.0, January 2004
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
including, without limitation, any warranties or conditions of TITLE,
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "{}" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
# mica mqtt 组件
基于 `t-io` 实现的 `mqtt` iot组件。
## 文档
- [mqtt 协议文档](https://github.com/mcxiaoke/mqtt)
## 代表
- 继续抽象,方便使用。
- 实现 `mqtt-broker` 功能。
## 参考
- [iot-mqtt-server](https://gitee.com/recallcode/iot-mqtt-server)
## 微信公众号
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
* ByteBuffer 工具
* @author L.cm
public class ByteBufferUtil {
* 空 byte 数组
public static final byte[] EMPTY = new byte[0];
* read byte
* @param buffer ByteBuffer
* @return byte
public static byte readByte(ByteBuffer buffer) {
return buffer.get();
* read unsigned byte
* @param buffer ByteBuffer
* @return short
public static short readUnsignedByte(ByteBuffer buffer) {
return (short) (readByte(buffer) & 0xFF);
* skip bytes
* @param buffer ByteBuffer
* @param skip skip bytes
* @return ByteBuffer
public static ByteBuffer skipBytes(ByteBuffer buffer, int skip) {
buffer.position(buffer.position() + skip);
return buffer;
public static String toString(ByteBuffer buffer) {
return toString(buffer, StandardCharsets.UTF_8);
public static String toString(ByteBuffer buffer, Charset charset) {
return new String(buffer.array(), buffer.position(), buffer.limit(), charset);
public static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.capacity());
// copy from the beginning
return clone;
package net.dreamlu.iot.mqtt.codec;
* 解码异常
* @author L.cm
public class DecoderException extends RuntimeException {
private static final long serialVersionUID = 1L;
public DecoderException() {
public DecoderException(String message) {
public DecoderException(String message, Throwable cause) {
super(message, cause);
public DecoderException(Throwable cause) {
public DecoderException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
package net.dreamlu.iot.mqtt.codec;
import java.util.Objects;
* 解析结果集
* @author L.cm
public class DecoderResult {
public static final DecoderResult SUCCESS = new DecoderResult();
private final boolean success;
private final Throwable cause;
public DecoderResult() {
this(true, null);
public DecoderResult(Throwable cause) {
this(false, Objects.requireNonNull(cause, "cause is null"));
public DecoderResult(boolean success, Throwable cause) {
this.success = success;
this.cause = cause;
public static DecoderResult failure(Throwable cause) {
return new DecoderResult(cause);
public boolean isSuccess() {
return success;
public boolean isFailure() {
return !success;
public Throwable getCause() {
return cause;
package net.dreamlu.iot.mqtt.codec;
final class MqttCodecUtil {
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
private static final int MIN_CLIENT_ID_LENGTH = 1;
private static final int MAX_CLIENT_ID_LENGTH = 23;
static boolean isValidPublishTopicName(String topicName) {
// publish topic name must not contain any wildcard
for (char c : TOPIC_WILDCARDS) {
if (topicName.indexOf(c) >= 0) {
return false;
return true;
static boolean isValidMessageId(int messageId) {
return messageId != 0;
static boolean isValidClientId(MqttVersion mqttVersion, String clientId) {
if (mqttVersion == MqttVersion.MQTT_3_1) {
return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
clientId.length() <= MAX_CLIENT_ID_LENGTH;
if (mqttVersion == MqttVersion.MQTT_3_1_1) {
// In Client Identifier of MQTT 3.1.1 specification, The Server MAY allow ClientId’s
// that contain more than 23 encoded bytes. And, The Server MAY allow zero-length ClientId.
return clientId != null;
throw new IllegalArgumentException(mqttVersion + " is unknown mqtt version");
static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
return mqttFixedHeader;
static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBACK:
case PUBREC:
case SUBACK:
if (mqttFixedHeader.isDup() ||
mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
return mqttFixedHeader;
case PUBREL:
if (mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
return mqttFixedHeader;
return mqttFixedHeader;
private MqttCodecUtil() {
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connack">MQTTV3.1/connack</a>
public final class MqttConnAckMessage extends MqttMessage {
public MqttConnAckMessage(MqttFixedHeader mqttFixedHeader,
MqttConnAckVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader);
public MqttConnAckVariableHeader variableHeader() {
return (MqttConnAckVariableHeader) super.variableHeader();
package net.dreamlu.iot.mqtt.codec;
* Variable header of {@link MqttConnectMessage}
public final class MqttConnAckVariableHeader {
private final MqttConnectReturnCode connectReturnCode;
private final boolean sessionPresent;
public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode,
boolean sessionPresent) {
this.connectReturnCode = connectReturnCode;
this.sessionPresent = sessionPresent;
public MqttConnectReturnCode connectReturnCode() {
return connectReturnCode;
public boolean isSessionPresent() {
return sessionPresent;
public String toString() {
return "MqttConnAckVariableHeader{" +
"connectReturnCode=" + connectReturnCode +
", sessionPresent=" + sessionPresent +
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect">MQTTV3.1/connect</a>
public final class MqttConnectMessage extends MqttMessage {
public MqttConnectMessage(MqttFixedHeader mqttFixedHeader,
MqttConnectVariableHeader variableHeader,
MqttConnectPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
public MqttConnectVariableHeader variableHeader() {
return (MqttConnectVariableHeader) super.variableHeader();
public MqttConnectPayload payload() {
return (MqttConnectPayload) super.payload();
package net.dreamlu.iot.mqtt.codec;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
* Payload of {@link MqttConnectMessage}
public final class MqttConnectPayload {
private final String clientIdentifier;
private final String willTopic;
private final byte[] willMessage;
private final String userName;
private final byte[] password;
public MqttConnectPayload(String clientIdentifier,
String willTopic,
byte[] willMessage,
String userName,
byte[] password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
public String clientIdentifier() {
return clientIdentifier;
public String willTopic() {
return willTopic;
public byte[] willMessageInBytes() {
return willMessage;
public String userName() {
return userName;
public byte[] passwordInBytes() {
return password;
public String password() {
return password == null ? null : new String(password, StandardCharsets.UTF_8);
public String toString() {
return "MqttConnectPayload{" +
"clientIdentifier='" + clientIdentifier + '\'' +
", willTopic='" + willTopic + '\'' +
", willMessage=" + Arrays.toString(willMessage) +
", userName='" + userName + '\'' +
", password=" + password() +
package net.dreamlu.iot.mqtt.codec;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
* Return Code of {@link MqttConnAckMessage}
public enum MqttConnectReturnCode {
* 0 接受连接
* 1 连接被拒绝,协议版本不可接受
* 2 连接被拒绝,标识符被拒绝
* 3 连接被拒绝,服务器不可用
* 4 连接被拒绝,用户名或密码错误
* 5 连接被拒绝,未经授权
private static final Map<Byte, MqttConnectReturnCode> VALUE_TO_CODE_MAP;
static {
final Map<Byte, MqttConnectReturnCode> valueMap = new HashMap<>();
for (MqttConnectReturnCode code : values()) {
valueMap.put(code.byteValue, code);
VALUE_TO_CODE_MAP = Collections.unmodifiableMap(valueMap);
private final byte byteValue;
MqttConnectReturnCode(byte byteValue) {
this.byteValue = byteValue;
public byte byteValue() {
return byteValue;
public static MqttConnectReturnCode valueOf(byte b) {
if (VALUE_TO_CODE_MAP.containsKey(b)) {
return VALUE_TO_CODE_MAP.get(b);
throw new IllegalArgumentException("unknown connect return code: " + (b & 0xFF));
package net.dreamlu.iot.mqtt.codec;
* Variable Header for the {@link MqttConnectMessage}
public final class MqttConnectVariableHeader {
private final String name;
private final int version;
private final boolean hasUserName;
private final boolean hasPassword;
private final boolean isWillRetain;
private final int willQos;
private final boolean isWillFlag;
private final boolean isCleanSession;
private final int keepAliveTimeSeconds;
public MqttConnectVariableHeader(
String name,
int version,
boolean hasUserName,
boolean hasPassword,
boolean isWillRetain,
int willQos,
boolean isWillFlag,
boolean isCleanSession,
int keepAliveTimeSeconds) {
this.name = name;
this.version = version;
this.hasUserName = hasUserName;
this.hasPassword = hasPassword;
this.isWillRetain = isWillRetain;
this.willQos = willQos;
this.isWillFlag = isWillFlag;
this.isCleanSession = isCleanSession;
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
public String name() {
return name;
public int version() {
return version;
public boolean hasUserName() {
return hasUserName;
public boolean hasPassword() {
return hasPassword;
public boolean isWillRetain() {
return isWillRetain;
public int willQos() {
return willQos;
public boolean isWillFlag() {
return isWillFlag;
public boolean isCleanSession() {
return isCleanSession;
public int keepAliveTimeSeconds() {
return keepAliveTimeSeconds;
public String toString() {
return "MqttConnectVariableHeader{" +
"name='" + name + '\'' +
", version=" + version +
", hasUserName=" + hasUserName +
", hasPassword=" + hasPassword +
", isWillRetain=" + isWillRetain +
", willQos=" + willQos +
", isWillFlag=" + isWillFlag +
", isCleanSession=" + isCleanSession +
", keepAliveTimeSeconds=" + keepAliveTimeSeconds +
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
* Decodes Mqtt messages from bytes, following
* <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">
* the MQTT protocol specification v3.1</a>
public final class MqttDecoder {
public static final MqttDecoder INSTANCE = new MqttDecoder();
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
private static final int MQTT_PROTOCOL_LENGTH = 2;
private final int maxBytesInMessage;
public MqttDecoder() {
public MqttDecoder(int maxBytesInMessage) {
this.maxBytesInMessage = maxBytesInMessage;
public boolean isSupport(ByteBuffer readBuffer) {
return readBuffer.limit() >= MQTT_PROTOCOL_LENGTH;
public MqttMessage decode(ByteBuffer readBuffer) {
// 1. 首先判断缓存中协议头是否读完(MQTT协议头为2字节)
if (readBuffer.limit() < MQTT_PROTOCOL_LENGTH) {
return null;
// 2. 解析 FixedHeader
MqttFixedHeader mqttFixedHeader;
int bytesRemainingInVariablePart;
try {
mqttFixedHeader = decodeFixedHeader(readBuffer);
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
} catch (Exception cause) {
return MqttMessageFactory.newInvalidMessage(cause);
// 3. 解析头信息
Object variableHeader = null;
try {
Result<?> decodedVariableHeader = decodeVariableHeader(readBuffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
} catch (Exception cause) {
return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
// 4. 解析消息体
final Result<?> decodedPayload;
try {
decodedPayload = decodePayload(readBuffer,
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
if (bytesRemainingInVariablePart != 0) {
throw new DecoderException("non-zero remaining payload bytes: " +
bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
return MqttMessageFactory.newMessage(mqttFixedHeader, variableHeader, decodedPayload.value);
} catch (Throwable cause) {
return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
* Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length.
* @param buffer the buffer to decode from
* @return the fixed header
private static MqttFixedHeader decodeFixedHeader(ByteBuffer buffer) {
short b1 = ByteBufferUtil.readUnsignedByte(buffer);
MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
boolean dupFlag = (b1 & 0x08) == 0x08;
int qosLevel = (b1 & 0x06) >> 1;
boolean retain = (b1 & 0x01) != 0;
int remainingLength = 0;
int multiplier = 1;
short digit;
int loops = 0;
do {
digit = buffer.get();
remainingLength += (digit & 127) * multiplier;
multiplier *= 128;
} while ((digit & 128) != 0 && loops < 4);
// MQTT protocol limits Remaining Length to 4 bytes
if (loops == 4 && (digit & 128) != 0) {
throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
return MqttCodecUtil.validateFixedHeader(MqttCodecUtil.resetUnusedFields(decodedFixedHeader));
* Decodes the variable header (if any)
* @param buffer the buffer to decode from
* @param mqttFixedHeader MqttFixedHeader of the same message
* @return the variable header
private static Result<?> decodeVariableHeader(ByteBuffer buffer, MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
return decodeConnectionVariableHeader(buffer);
return decodeConnAckVariableHeader(buffer);
case SUBACK:
case PUBACK:
case PUBREC:
case PUBREL:
return decodeMessageIdVariableHeader(buffer);
return decodePublishVariableHeader(buffer, mqttFixedHeader);
// Empty variable header
return new Result<>(null, 0);
return new Result<>(null, 0);
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuffer buffer) {
final Result<String> protoString = decodeString(buffer);
int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
final byte protocolLevel = buffer.get();
numberOfBytesConsumed += 1;
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
final int b1 = ByteBufferUtil.readUnsignedByte(buffer);
numberOfBytesConsumed += 1;
final Result<Integer> keepAlive = decodeMsbLsb(buffer);
numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
final boolean hasUserName = (b1 & 0x80) == 0x80;
final boolean hasPassword = (b1 & 0x40) == 0x40;
final boolean willRetain = (b1 & 0x20) == 0x20;
final int willQos = (b1 & 0x18) >> 3;
final boolean willFlag = (b1 & 0x04) == 0x04;
final boolean cleanSession = (b1 & 0x02) == 0x02;
if (mqttVersion == MqttVersion.MQTT_3_1_1) {
final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
if (!zeroReservedFlag) {
// MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
// set to zero and disconnect the Client if it is not zero.
// See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
throw new DecoderException("non-zero reserved flag");
final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
return new Result<>(mqttConnectVariableHeader, numberOfBytesConsumed);
private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuffer buffer) {
final boolean sessionPresent = (ByteBufferUtil.readUnsignedByte(buffer) & 0x01) == 0x01;
byte returnCode = ByteBufferUtil.readByte(buffer);
final int numberOfBytesConsumed = 2;
final MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent);
return new Result<>(mqttConnAckVariableHeader, numberOfBytesConsumed);
private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuffer buffer) {
final Result<Integer> messageId = decodeMessageId(buffer);
return new Result<>(MqttMessageIdVariableHeader.from(messageId.value),
private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(
ByteBuffer buffer,
MqttFixedHeader mqttFixedHeader) {
final Result<String> decodedTopic = decodeString(buffer);
if (!MqttCodecUtil.isValidPublishTopicName(decodedTopic.value)) {
throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
int messageId = -1;
if (mqttFixedHeader.qosLevel().value() > 0) {
final Result<Integer> decodedMessageId = decodeMessageId(buffer);
messageId = decodedMessageId.value;
numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
final MqttPublishVariableHeader mqttPublishVariableHeader =
new MqttPublishVariableHeader(decodedTopic.value, messageId);
return new Result<>(mqttPublishVariableHeader, numberOfBytesConsumed);
private static Result<Integer> decodeMessageId(ByteBuffer buffer) {
final Result<Integer> messageId = decodeMsbLsb(buffer);
if (!MqttCodecUtil.isValidMessageId(messageId.value)) {
throw new DecoderException("invalid messageId: " + messageId.value);
return messageId;
* Decodes the payload.
* @param buffer the buffer to decode from
* @param messageType type of the message being decoded
* @param bytesRemainingInVariablePart bytes remaining
* @param variableHeader variable header of the same message
* @return the payload
private static Result<?> decodePayload(
ByteBuffer buffer,
MqttMessageType messageType,
int bytesRemainingInVariablePart,
Object variableHeader) {
switch (messageType) {
return decodeConnectionPayload(buffer, (MqttConnectVariableHeader) variableHeader);
return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
case SUBACK:
return decodeSubackPayload(buffer, bytesRemainingInVariablePart);
return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
return decodePublishPayload(buffer, bytesRemainingInVariablePart);
// unknown payload , no byte consumed
return new Result<>(null, 0);
private static Result<MqttConnectPayload> decodeConnectionPayload(
ByteBuffer buffer,
MqttConnectVariableHeader mqttConnectVariableHeader) {
final Result<String> decodedClientId = decodeString(buffer);
final String decodedClientIdValue = decodedClientId.value;
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
(byte) mqttConnectVariableHeader.version());
if (!MqttCodecUtil.isValidClientId(mqttVersion, decodedClientIdValue)) {
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
Result<String> decodedWillTopic = null;
Result<byte[]> decodedWillMessage = null;
if (mqttConnectVariableHeader.isWillFlag()) {
decodedWillTopic = decodeString(buffer, 0, 32767);
numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
decodedWillMessage = decodeByteArray(buffer);
numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
Result<String> decodedUserName = null;
Result<byte[]> decodedPassword = null;
if (mqttConnectVariableHeader.hasUserName()) {
decodedUserName = decodeString(buffer);
numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
if (mqttConnectVariableHeader.hasPassword()) {
decodedPassword = decodeByteArray(buffer);
numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
final MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(
decodedWillTopic != null ? decodedWillTopic.value : null,
decodedWillMessage != null ? decodedWillMessage.value : null,
decodedUserName != null ? decodedUserName.value : null,
decodedPassword != null ? decodedPassword.value : null);
return new Result<>(mqttConnectPayload, numberOfBytesConsumed);
private static Result<MqttSubscribePayload> decodeSubscribePayload(
ByteBuffer buffer,
int bytesRemainingInVariablePart) {
final List<MqttTopicSubscription> subscribeTopics = new ArrayList<>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
final Result<String> decodedTopicName = decodeString(buffer);
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
int qos = ByteBufferUtil.readUnsignedByte(buffer) & 0x03;
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos)));
return new Result<>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
private static Result<MqttSubAckPayload> decodeSubackPayload(
ByteBuffer buffer,
int bytesRemainingInVariablePart) {
final List<Integer> grantedQos = new ArrayList<>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
int qos = ByteBufferUtil.readUnsignedByte(buffer);
if (qos != MqttQoS.FAILURE.value()) {
qos &= 0x03;
return new Result<>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
private static Result<MqttUnsubscribePayload> decodeUnsubscribePayload(
ByteBuffer buffer,
int bytesRemainingInVariablePart) {
final List<String> unsubscribeTopics = new ArrayList<>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
final Result<String> decodedTopicName = decodeString(buffer);
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
return new Result<>(new MqttUnsubscribePayload(unsubscribeTopics), numberOfBytesConsumed);
private static Result<ByteBuffer> decodePublishPayload(ByteBuffer buffer, int bytesRemainingInVariablePart) {
byte[] slice = new byte[bytesRemainingInVariablePart];
buffer.get(slice, 0, bytesRemainingInVariablePart);
ByteBuffer byteBuffer = ByteBuffer.wrap(slice);
return new Result<>(byteBuffer, bytesRemainingInVariablePart);
private static Result<String> decodeString(ByteBuffer buffer) {
return decodeString(buffer, 0, Integer.MAX_VALUE);
private static Result<String> decodeString(ByteBuffer buffer, int minBytes, int maxBytes) {
final Result<Integer> decodedSize = decodeMsbLsb(buffer);
int size = decodedSize.value;
int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
if (size < minBytes || size > maxBytes) {
ByteBufferUtil.skipBytes(buffer, size);
numberOfBytesConsumed += size;
return new Result<>(null, numberOfBytesConsumed);
String s = new String(buffer.array(), buffer.position(), size, StandardCharsets.UTF_8);
ByteBufferUtil.skipBytes(buffer, size);
numberOfBytesConsumed += size;
return new Result<>(s, numberOfBytesConsumed);
private static Result<byte[]> decodeByteArray(ByteBuffer buffer) {
final Result<Integer> decodedSize = decodeMsbLsb(buffer);
int size = decodedSize.value;
byte[] bytes = new byte[size];
// buffer.readBytes(bytes);
return new Result<>(bytes, decodedSize.numberOfBytesConsumed + size);
private static Result<Integer> decodeMsbLsb(ByteBuffer buffer) {
return decodeMsbLsb(buffer, 0, 65535);
private static Result<Integer> decodeMsbLsb(ByteBuffer buffer, int min, int max) {
short msbSize = ByteBufferUtil.readUnsignedByte(buffer);
short lsbSize = ByteBufferUtil.readUnsignedByte(buffer);
final int numberOfBytesConsumed = 2;
int result = msbSize << 8 | lsbSize;
if (result < min || result > max) {
result = -1;
return new Result<>(result, numberOfBytesConsumed);
private static final class Result<T> {
private final T value;
private final int numberOfBytesConsumed;
Result(T value, int numberOfBytesConsumed) {
this.value = value;
this.numberOfBytesConsumed = numberOfBytesConsumed;
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
* Encodes Mqtt messages into bytes following the protocol specification v3.1
* as described here <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
public final class MqttEncoder {
public static final MqttEncoder INSTANCE = new MqttEncoder();
* This is the main encoding method.
* It's only visible for testing.
* @param message MQTT message to encode
* @return ByteBuffer with encoded bytes
public ByteBuffer doEncode(MqttMessage message) {
switch (message.fixedHeader().messageType()) {
return encodeConnectMessage((MqttConnectMessage) message);
return encodeConnAckMessage((MqttConnAckMessage) message);
return encodePublishMessage((MqttPublishMessage) message);
return encodeSubscribeMessage((MqttSubscribeMessage) message);
return encodeUnsubscribeMessage((MqttUnsubscribeMessage) message);
case SUBACK:
return encodeSubAckMessage((MqttSubAckMessage) message);
case PUBACK:
case PUBREC:
case PUBREL:
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message);
return encodeMessageWithOnlySingleByteFixedHeader(message);
throw new IllegalArgumentException(
"Unknown message type: " + message.fixedHeader().messageType().value());
private static ByteBuffer encodeConnectMessage(MqttConnectMessage message) {
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttConnectVariableHeader variableHeader = message.variableHeader();
MqttConnectPayload payload = message.payload();
MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
(byte) variableHeader.version());
// as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
throw new DecoderException("Without a username, the password MUST be not set");
// Client id
String clientIdentifier = payload.clientIdentifier();
if (!MqttCodecUtil.isValidClientId(mqttVersion, clientIdentifier)) {
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
payloadBufferSize += 2 + clientIdentifierBytes.length;
// Will topic and message
String willTopic = payload.willTopic();
byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : ByteBufferUtil.EMPTY;
byte[] willMessage = payload.willMessageInBytes();
byte[] willMessageBytes = willMessage != null ? willMessage : ByteBufferUtil.EMPTY;
if (variableHeader.isWillFlag()) {
payloadBufferSize += 2 + willTopicBytes.length;
payloadBufferSize += 2 + willMessageBytes.length;
String userName = payload.userName();
byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : ByteBufferUtil.EMPTY;
if (variableHeader.hasUserName()) {
payloadBufferSize += 2 + userNameBytes.length;
byte[] password = payload.passwordInBytes();
byte[] passwordBytes = password != null ? password : ByteBufferUtil.EMPTY;
if (variableHeader.hasPassword()) {
payloadBufferSize += 2 + passwordBytes.length;
// Fixed header
byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) protocolNameBytes.length);
buf.put((byte) variableHeader.version());
buf.put((byte) getConnVariableHeaderFlag(variableHeader));
buf.putShort((short) variableHeader.keepAliveTimeSeconds());
// Payload
buf.putShort((short) clientIdentifierBytes.length);
buf.put(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) {
buf.putShort((short) willTopicBytes.length);
buf.put(willTopicBytes, 0, willTopicBytes.length);
buf.putShort((short) willMessageBytes.length);
buf.put(willMessageBytes, 0, willMessageBytes.length);
if (variableHeader.hasUserName()) {
buf.putShort((short) userNameBytes.length);
buf.put(userNameBytes, 0, userNameBytes.length);
if (variableHeader.hasPassword()) {
buf.putShort((short) passwordBytes.length);
buf.put(passwordBytes, 0, passwordBytes.length);
return buf;
private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
int flagByte = 0;
if (variableHeader.hasUserName()) {
flagByte |= 0x80;
if (variableHeader.hasPassword()) {
flagByte |= 0x40;
if (variableHeader.isWillRetain()) {
flagByte |= 0x20;
flagByte |= (variableHeader.willQos() & 0x03) << 3;
if (variableHeader.isWillFlag()) {
flagByte |= 0x04;
if (variableHeader.isCleanSession()) {
flagByte |= 0x02;
return flagByte;
private static ByteBuffer encodeConnAckMessage(MqttConnAckMessage message) {
ByteBuffer buf = ByteBuffer.allocate(4);
buf.put((byte) getFixedHeaderByte1(message.fixedHeader()));
buf.put((byte) 2);
buf.put((byte) (message.variableHeader().isSessionPresent() ? 0x01 : 0x00));
return buf;
private static ByteBuffer encodeSubscribeMessage(MqttSubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttSubscribePayload payload = message.payload();
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
payloadBufferSize += 1;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.putShort((short) messageId);
// Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.putShort((short) topicNameBytes.length);
buf.put(topicNameBytes, 0, topicNameBytes.length);
buf.put((byte) topic.qualityOfService().value());
return buf;
private static ByteBuffer encodeUnsubscribeMessage(MqttUnsubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttUnsubscribePayload payload = message.payload();
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.putShort((short) messageId);
// Payload
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.putShort((short) topicNameBytes.length);
buf.put(topicNameBytes, 0, topicNameBytes.length);
return buf;
private static ByteBuffer encodeSubAckMessage(MqttSubAckMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) message.variableHeader().messageId());
for (int qos : message.payload().grantedQoSLevels()) {
buf.put((byte) qos);
return buf;
private static ByteBuffer encodePublishMessage(MqttPublishMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuffer payload = message.payload().duplicate();
String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
int payloadBufferSize = payload.limit();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variablePartSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.putShort((short) topicNameBytes.length);
if (mqttFixedHeader.qosLevel().value() > 0) {
buf.putShort((short) variableHeader.packetId());
return buf;
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(MqttMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int msgId = variableHeader.messageId();
// variable part only has a message id
int variableHeaderBufferSize = 2;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuffer buf = ByteBuffer.allocate(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.putShort((short) msgId);
return buf;
private static ByteBuffer encodeMessageWithOnlySingleByteFixedHeader(MqttMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
ByteBuffer buf = ByteBuffer.allocate(2);
buf.put((byte) getFixedHeaderByte1(mqttFixedHeader));
buf.put((byte) 0);
return buf;
private static int getFixedHeaderByte1(MqttFixedHeader header) {
int ret = 0;
ret |= header.messageType().value() << 4;
if (header.isDup()) {
ret |= 0x08;
ret |= header.qosLevel().value() << 1;
if (header.isRetain()) {
ret |= 0x01;
return ret;
private static void writeVariableLengthInt(ByteBuffer buffer, int num) {
do {
int digit = num % 128;
num /= 128;
if (num > 0) {
digit |= 0x80;
buffer.put((byte) digit);
} while (num > 0);
private static int getVariableLengthInt(int num) {
int count = 0;
do {
num /= 128;
} while (num > 0);
return count;
private static byte[] encodeStringUtf8(String s) {
return s.getBytes(StandardCharsets.UTF_8);
package net.dreamlu.iot.mqtt.codec;
import java.util.Objects;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#fixed-header">
* MQTTV3.1/fixed-header</a>
public final class MqttFixedHeader {
private final MqttMessageType messageType;
private final boolean isDup;
private final MqttQoS qosLevel;
private final boolean isRetain;
private final int remainingLength;
public MqttFixedHeader(MqttMessageType messageType,
boolean isDup,
MqttQoS qosLevel,
boolean isRetain,
int remainingLength) {
this.messageType = Objects.requireNonNull(messageType, "messageType is null");
this.isDup = isDup;
this.qosLevel = Objects.requireNonNull(qosLevel, "qosLevel is null");
this.isRetain = isRetain;
this.remainingLength = remainingLength;
public MqttMessageType messageType() {
return messageType;
public boolean isDup() {
return isDup;
public MqttQoS qosLevel() {
return qosLevel;
public boolean isRetain() {
return isRetain;
public int remainingLength() {
return remainingLength;
public String toString() {
return "MqttFixedHeader{" +
"messageType=" + messageType +
", isDup=" + isDup +
", qosLevel=" + qosLevel +
", isRetain=" + isRetain +
", remainingLength=" + remainingLength +
package net.dreamlu.iot.mqtt.codec;
* A {@link MqttIdentifierRejectedException} which is thrown when a CONNECT request contains invalid client identifier.
public final class MqttIdentifierRejectedException extends DecoderException {
private static final long serialVersionUID = -1323503322689614981L;
* Creates a new instance
public MqttIdentifierRejectedException() {
* Creates a new instance
public MqttIdentifierRejectedException(String message, Throwable cause) {
super(message, cause);
* Creates a new instance
public MqttIdentifierRejectedException(String message) {
* Creates a new instance
public MqttIdentifierRejectedException(Throwable cause) {
package net.dreamlu.iot.mqtt.codec;
import org.tio.core.intf.Packet;
* Base class for all MQTT message types.
public class MqttMessage extends Packet {
private final MqttFixedHeader mqttFixedHeader;
private final Object variableHeader;
private final Object payload;
private final DecoderResult decoderResult;
// Constants for fixed-header only message types with all flags set to 0 (see
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.2_-)
public static final MqttMessage PINGREQ = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGREQ, false,
MqttQoS.AT_MOST_ONCE, false, 0));
public static final MqttMessage PINGRESP = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,
MqttQoS.AT_MOST_ONCE, false, 0));
public static final MqttMessage DISCONNECT = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false,
MqttQoS.AT_MOST_ONCE, false, 0));
public MqttMessage(MqttFixedHeader mqttFixedHeader) {
this(mqttFixedHeader, null, null);
public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader) {
this(mqttFixedHeader, variableHeader, null);
public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
this(mqttFixedHeader, variableHeader, payload, DecoderResult.SUCCESS);
public MqttMessage(MqttFixedHeader mqttFixedHeader,
Object variableHeader,
Object payload,
DecoderResult decoderResult) {
this.mqttFixedHeader = mqttFixedHeader;
this.variableHeader = variableHeader;
this.payload = payload;
this.decoderResult = decoderResult;
public MqttFixedHeader fixedHeader() {
return mqttFixedHeader;
public Object variableHeader() {
return variableHeader;
public Object payload() {
return payload;
public DecoderResult decoderResult() {
return decoderResult;
public String toString() {
return "MqttMessage{" +
"fixedHeader=" + mqttFixedHeader +
", variableHeader=" + variableHeader +
", payload=" + payload +
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public final class MqttMessageBuilders {
public static final class PublishBuilder {
private String topic;
private boolean retained;
private MqttQoS qos;
private ByteBuffer payload;
private int messageId;
PublishBuilder() {
public PublishBuilder topicName(String topic) {
this.topic = topic;
return this;
public PublishBuilder retained(boolean retained) {
this.retained = retained;
return this;
public PublishBuilder qos(MqttQoS qos) {
this.qos = qos;
return this;
public PublishBuilder payload(ByteBuffer payload) {
this.payload = payload;
return this;
public PublishBuilder messageId(int messageId) {
this.messageId = messageId;
return this;
public MqttPublishMessage build() {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, ByteBufferUtil.clone(payload));
public static final class ConnectBuilder {
private MqttVersion version = MqttVersion.MQTT_3_1_1;
private String clientId;
private boolean cleanSession;
private boolean hasUser;
private boolean hasPassword;
private int keepAliveSecs;
private boolean willFlag;
private boolean willRetain;
private MqttQoS willQos = MqttQoS.AT_MOST_ONCE;
private String willTopic;
private byte[] willMessage;
private String username;
private byte[] password;
ConnectBuilder() {
public ConnectBuilder protocolVersion(MqttVersion version) {
this.version = version;
return this;
public ConnectBuilder clientId(String clientId) {
this.clientId = clientId;
return this;
public ConnectBuilder cleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
return this;
public ConnectBuilder keepAlive(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
public ConnectBuilder willFlag(boolean willFlag) {
this.willFlag = willFlag;
return this;
public ConnectBuilder willQoS(MqttQoS willQos) {
this.willQos = willQos;
return this;
public ConnectBuilder willTopic(String willTopic) {
this.willTopic = willTopic;
return this;
public ConnectBuilder willMessage(byte[] willMessage) {
this.willMessage = willMessage;
return this;
public ConnectBuilder willRetain(boolean willRetain) {
this.willRetain = willRetain;
return this;
public ConnectBuilder hasUser(boolean value) {
this.hasUser = value;
return this;
public ConnectBuilder hasPassword(boolean value) {
this.hasPassword = value;
return this;
public ConnectBuilder username(String username) {
this.hasUser = username != null;
this.username = username;
return this;
public ConnectBuilder password(byte[] password) {
this.hasPassword = password != null;
this.password = password;
return this;
public MqttConnectMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnectVariableHeader mqttConnectVariableHeader =
new MqttConnectVariableHeader(
MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(clientId, willTopic, willMessage, username, password);
return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
public static final class SubscribeBuilder {
private List<MqttTopicSubscription> subscriptions;
private int messageId;
SubscribeBuilder() {
public SubscribeBuilder addSubscription(MqttQoS qos, String topic) {
if (subscriptions == null) {
subscriptions = new ArrayList<>(5);
subscriptions.add(new MqttTopicSubscription(topic, qos));
return this;
public SubscribeBuilder messageId(int messageId) {
this.messageId = messageId;
return this;
public MqttSubscribeMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions);
return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
public static final class UnsubscribeBuilder {
private List<String> topicFilters;
private int messageId;
UnsubscribeBuilder() {
public UnsubscribeBuilder addTopicFilter(String topic) {
if (topicFilters == null) {
topicFilters = new ArrayList<String>(5);
return this;
public UnsubscribeBuilder messageId(int messageId) {
this.messageId = messageId;
return this;
public MqttUnsubscribeMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters);
return new MqttUnsubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
public static final class ConnAckBuilder {
private MqttConnectReturnCode returnCode;
private boolean sessionPresent;
ConnAckBuilder() {
public ConnAckBuilder returnCode(MqttConnectReturnCode returnCode) {
this.returnCode = returnCode;
return this;
public ConnAckBuilder sessionPresent(boolean sessionPresent) {
this.sessionPresent = sessionPresent;
return this;
public MqttConnAckMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
public static ConnectBuilder connect() {
return new ConnectBuilder();
public static ConnAckBuilder connAck() {
return new ConnAckBuilder();
public static PublishBuilder publish() {
return new PublishBuilder();
public static SubscribeBuilder subscribe() {
return new SubscribeBuilder();
public static UnsubscribeBuilder unsubscribe() {
return new UnsubscribeBuilder();
private MqttMessageBuilders() {
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
* Utility class with factory methods to create different types of MQTT messages.
public final class MqttMessageFactory {
public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
switch (mqttFixedHeader.messageType()) {
return new MqttConnectMessage(
(MqttConnectVariableHeader) variableHeader,
(MqttConnectPayload) payload);
return new MqttConnAckMessage(mqttFixedHeader, (MqttConnAckVariableHeader) variableHeader);
return new MqttSubscribeMessage(
(MqttMessageIdVariableHeader) variableHeader,
(MqttSubscribePayload) payload);
case SUBACK:
return new MqttSubAckMessage(
(MqttMessageIdVariableHeader) variableHeader,
(MqttSubAckPayload) payload);
return new MqttUnsubAckMessage(
(MqttMessageIdVariableHeader) variableHeader);
return new MqttUnsubscribeMessage(
(MqttMessageIdVariableHeader) variableHeader,
(MqttUnsubscribePayload) payload);
return new MqttPublishMessage(
(MqttPublishVariableHeader) variableHeader,
(ByteBuffer) payload);
case PUBACK:
return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader);
case PUBREC:
case PUBREL:
return new MqttMessage(mqttFixedHeader, variableHeader);
return new MqttMessage(mqttFixedHeader);
throw new IllegalArgumentException("unknown message type: " + mqttFixedHeader.messageType());
public static MqttMessage newInvalidMessage(Throwable cause) {
return newInvalidMessage(null, null, cause);
public static MqttMessage newInvalidMessage(MqttFixedHeader mqttFixedHeader,
Object variableHeader,
Throwable cause) {
return new MqttMessage(mqttFixedHeader, variableHeader, null, DecoderResult.failure(cause));
private MqttMessageFactory() {
package net.dreamlu.iot.mqtt.codec;
* Variable Header containing only Message Id
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#msg-id">MQTTV3.1/msg-id</a>
public final class MqttMessageIdVariableHeader {
private final int messageId;
public static MqttMessageIdVariableHeader from(int messageId) {
if (messageId < 1 || messageId > 0xffff) {
throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)");
return new MqttMessageIdVariableHeader(messageId);
private MqttMessageIdVariableHeader(int messageId) {
this.messageId = messageId;
public int messageId() {
return messageId;
public String toString() {
return "MqttMessageIdVariableHeader{" +
"messageId=" + messageId +
package net.dreamlu.iot.mqtt.codec;
* MQTT Message Types.
public enum MqttMessageType {
* 连接服务端
* 确认连接请求
* 发布消息
* 发布确认
* 发布收到(QoS 2,第一步)
* 发布释放(QoS 2,第二步)
* 发布完成(QoS 2,第三步)
* 订阅主题
* 订阅确认
* 取消订阅
* 取消订阅确认
* 心跳请求
* 心跳响应
* 断开连接
private final int value;
MqttMessageType(int value) {
this.value = value;
public int value() {
return value;
public static MqttMessageType valueOf(int type) {
for (MqttMessageType t : values()) {
if (t.value == type) {
return t;
throw new IllegalArgumentException("unknown message type: " + type);
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#puback">MQTTV3.1/puback</a>
public final class MqttPubAckMessage extends MqttMessage {
public MqttPubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader);
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#publish">MQTTV3.1/publish</a>
public class MqttPublishMessage extends MqttMessage {
public MqttPublishMessage(MqttFixedHeader mqttFixedHeader,
MqttPublishVariableHeader variableHeader,
ByteBuffer payload) {
super(mqttFixedHeader, variableHeader, payload);
public MqttPublishVariableHeader variableHeader() {
return (MqttPublishVariableHeader) super.variableHeader();
public ByteBuffer payload() {
return (ByteBuffer) super.payload();
package net.dreamlu.iot.mqtt.codec;
* Variable Header of the {@link MqttPublishMessage}
public final class MqttPublishVariableHeader {
private final String topicName;
private final int packetId;
public MqttPublishVariableHeader(String topicName, int packetId) {
this.topicName = topicName;
this.packetId = packetId;
public String topicName() {
return topicName;
public int packetId() {
return packetId;
public String toString() {
return "MqttPublishVariableHeader{" +
"topicName='" + topicName + '\'' +
", packetId=" + packetId +
package net.dreamlu.iot.mqtt.codec;
public enum MqttQoS {
* QoS level 0 至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。
* QoS level 1 都要在可变头部中附加一个16位的消息ID,SUBSCRIBE 和 UNSUBSCRIBE 消息使用 QoS level 1。
* QoS level 2 仅仅在 PUBLISH 类型消息中出现,要求在可变头部中要附加消息ID。
* 失败
private final int value;
MqttQoS(int value) {
this.value = value;
public int value() {
return value;
public static MqttQoS valueOf(int value) {
for (MqttQoS q : values()) {
if (q.value == value) {
return q;
throw new IllegalArgumentException("invalid QoS: " + value);
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#suback">MQTTV3.1/suback</a>
public final class MqttSubAckMessage extends MqttMessage {
public MqttSubAckMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubAckPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
public MqttSubAckPayload payload() {
return (MqttSubAckPayload) super.payload();
package net.dreamlu.iot.mqtt.codec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
* Payload of the {@link MqttSubAckMessage}
public class MqttSubAckPayload {
private final List<Integer> grantedQoSLevels;
public MqttSubAckPayload(int... grantedQoSLevels) {
Objects.requireNonNull(grantedQoSLevels, "grantedQoSLevels is null");
List<Integer> list = new ArrayList<>(grantedQoSLevels.length);
for (int v : grantedQoSLevels) {
this.grantedQoSLevels = Collections.unmodifiableList(list);
public MqttSubAckPayload(Iterable<Integer> grantedQoSLevels) {
Objects.requireNonNull(grantedQoSLevels, "grantedQoSLevels is null");
List<Integer> list = new ArrayList<>();
for (Integer v : grantedQoSLevels) {
if (v == null) {
this.grantedQoSLevels = Collections.unmodifiableList(list);
public List<Integer> grantedQoSLevels() {
return grantedQoSLevels;
public String toString() {
return "MqttSubAckPayload{" +
"grantedQoSLevels=" + grantedQoSLevels +
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#subscribe">
* MQTTV3.1/subscribe</a>
public final class MqttSubscribeMessage extends MqttMessage {
public MqttSubscribeMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
public MqttSubscribePayload payload() {
return (MqttSubscribePayload) super.payload();
package net.dreamlu.iot.mqtt.codec;
import java.util.Collections;
import java.util.List;
* Payload of the {@link MqttSubscribeMessage}
public final class MqttSubscribePayload {
private final List<MqttTopicSubscription> topicSubscriptions;
public MqttSubscribePayload(List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
public List<MqttTopicSubscription> topicSubscriptions() {
return topicSubscriptions;
public String toString() {
StringBuilder builder = new StringBuilder("MqttSubscribePayload[");
for (MqttTopicSubscription topicSubscription : topicSubscriptions) {
builder.append(topicSubscription).append(", ");
if (!topicSubscriptions.isEmpty()) {
builder.setLength(builder.length() - 2);
return builder.append(']').toString();
package net.dreamlu.iot.mqtt.codec;
* Contains a topic name and Qos Level.
* This is part of the {@link MqttSubscribePayload}
public final class MqttTopicSubscription {
private final String topicFilter;
private final MqttQoS qualityOfService;
public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) {
this.topicFilter = topicFilter;
this.qualityOfService = qualityOfService;
public String topicName() {
return topicFilter;
public MqttQoS qualityOfService() {
return qualityOfService;
public String toString() {
return "MqttTopicSubscription{" +
"topicFilter='" + topicFilter + '\'' +
", qualityOfService=" + qualityOfService +
package net.dreamlu.iot.mqtt.codec;
* A {@link MqttUnacceptableProtocolVersionException} which is thrown when
* a CONNECT request contains unacceptable protocol version.
public final class MqttUnacceptableProtocolVersionException extends DecoderException {
private static final long serialVersionUID = 4914652213232455749L;
* Creates a new instance
public MqttUnacceptableProtocolVersionException() {
* Creates a new instance
public MqttUnacceptableProtocolVersionException(String message, Throwable cause) {
super(message, cause);
* Creates a new instance
public MqttUnacceptableProtocolVersionException(String message) {
* Creates a new instance
public MqttUnacceptableProtocolVersionException(Throwable cause) {
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#unsuback">MQTTV3.1/unsuback</a>
public final class MqttUnsubAckMessage extends MqttMessage {
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader, null);
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
package net.dreamlu.iot.mqtt.codec;
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#unsubscribe">
* MQTTV3.1/unsubscribe</a>
public final class MqttUnsubscribeMessage extends MqttMessage {
public MqttUnsubscribeMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttUnsubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
public MqttUnsubscribePayload payload() {
return (MqttUnsubscribePayload) super.payload();
package net.dreamlu.iot.mqtt.codec;
import java.util.Collections;
import java.util.List;
* Payload of the {@link MqttUnsubscribeMessage}
public final class MqttUnsubscribePayload {
private final List<String> topics;
public MqttUnsubscribePayload(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
public List<String> topics() {
return topics;
public String toString() {
StringBuilder builder = new StringBuilder("MqttUnsubscribePayload[");
for (String topic : topics) {
builder.append("topicName = ").append(topic).append(", ");
if (!topics.isEmpty()) {
builder.setLength(builder.length() - 2);
return builder.append("}").toString();
package net.dreamlu.iot.mqtt.codec;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
* Mqtt version specific constant values used by multiple classes in mqtt-codec.
public enum MqttVersion {
* mqtt 协议版本
MQTT_3_1("MQIsdp", (byte) 3),
MQTT_3_1_1("MQTT", (byte) 4);
private final String name;
private final byte level;
MqttVersion(String protocolName, byte protocolLevel) {
name = Objects.requireNonNull(protocolName, "protocolName");
level = protocolLevel;
public String protocolName() {
return name;
public byte[] protocolNameBytes() {
return name.getBytes(StandardCharsets.UTF_8);
public byte protocolLevel() {
return level;
public static MqttVersion fromProtocolNameAndLevel(String protocolName, byte protocolLevel) {
for (MqttVersion mv : values()) {
if (mv.name.equals(protocolName)) {
if (mv.level == protocolLevel) {
return mv;
} else {
throw new MqttUnacceptableProtocolVersionException(protocolName + " and " + protocolLevel + " are not match");
throw new MqttUnacceptableProtocolVersionException(protocolName + "is unknown protocol name");
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.AcceptCompletionHandler;
import java.nio.ByteBuffer;
* mqtt 客户端处理
* @author L.cm
public class MqttClientAioHandler implements ClientAioHandler {
private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final MqttClientProcessor processor;
public MqttClientAioHandler(MqttClientProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.processor =processor;
public Packet heartbeatPacket(ChannelContext channelContext) {
return MqttMessage.PINGREQ;
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
return mqttDecoder.decode(buffer);
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return mqttEncoder.doEncode((MqttMessage) packet);
public void handler(Packet packet, ChannelContext context) throws Exception {
MqttMessage message = (MqttMessage) packet;
// 1. 先判断 mqtt 消息解析是否正常
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure()) {
processFailure(context, message);
MqttFixedHeader fixedHeader = message.fixedHeader();
// 根据消息类型处理消息
MqttMessageType messageType = fixedHeader.messageType();
switch (messageType) {
processor.processConAck(context, (MqttConnAckMessage) message);
case SUBACK:
processor.processSubAck((MqttSubAckMessage) message);
processor.processPublish(context, (MqttPublishMessage) message);
processor.processUnSubAck((MqttUnsubAckMessage) message);
case PUBACK:
processor.processPubAck((MqttPubAckMessage) message);
case PUBREC:
processor.processPubRec(context, message);
case PUBREL:
processor.processPubRel(context, message);
* 处理失败
* @param context ChannelContext
* @param mqttMessage MqttMessage
private void processFailure(ChannelContext context, MqttMessage mqttMessage) {
// 客户端失败,我认为日志记录异常就行了
Throwable cause = mqttMessage.decoderResult().getCause();
log.error(cause.getMessage(), cause);
package net.dreamlu.iot.mqtt.core.client;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import org.tio.client.DefaultClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.nio.charset.StandardCharsets;
* mqtt 客户端监听器
* @author L.cm
public class MqttClientAioListener extends DefaultClientAioListener {
private final String clientId;
private final String username;
private final String password;
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (isConnected) {
// 1. 建立连接后发送 mqtt 连接的消息
MqttConnectMessage message = MqttMessageBuilders.connect()
Tio.send(context, message);
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import org.tio.core.ChannelContext;
* mqtt 客户端消息处理器
* @author L.cm
public interface MqttClientProcessor {
* 处理服务端链接 ack
* @param context ChannelContext
* @param message MqttConnAckMessage
void processConAck(ChannelContext context, MqttConnAckMessage message);
* 处理服务端订阅的 ack
* @param message MqttSubAckMessage
void processSubAck(MqttSubAckMessage message);
* 处理服务端 publish 的消息
* @param context ChannelContext
* @param message MqttPublishMessage
void processPublish(ChannelContext context, MqttPublishMessage message);
* 处理服务端解除订阅的 ack
* @param message MqttSubAckMessage
void processUnSubAck(MqttUnsubAckMessage message);
* 处理服务端 publish 的 ack
* @param message MqttPubAckMessage
void processPubAck(MqttPubAckMessage message);
* 处理服务端 publish rec
* @param context ChannelContext
* @param message MqttPubAckMessage
void processPubRec(ChannelContext context, MqttMessage message);
* 处理服务端 publish rel
* @param context ChannelContext
* @param message MqttPubAckMessage
void processPubRel(ChannelContext context, MqttMessage message);
* 处理服务端 publish comp
* @param message MqttPubAckMessage
void processPubComp(MqttMessage message);
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.AcceptCompletionHandler;
import org.tio.server.intf.ServerAioHandler;
import java.nio.ByteBuffer;
* @author L.cm
public class MqttServerAioHandler implements ServerAioHandler {
private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final MqttServerProcessor processor;
public MqttServerAioHandler(MqttServerProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.processor = processor;
* 根据ByteBuffer解码成业务需要的Packet对象.
* 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据
* @param buffer 参与本次希望解码的ByteBuffer
* @param limit ByteBuffer的limit
* @param position ByteBuffer的position,不一定是0哦
* @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)
* @param channelContext ChannelContext
* @return Packet
* @throws AioDecodeException AioDecodeException
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
return mqttDecoder.decode(buffer);
* 编码
* @param packet Packet
* @param tioConfig TioConfig
* @param channelContext ChannelContext
* @return ByteBuffer
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return mqttEncoder.doEncode((MqttMessage) packet);
* 处理消息包
* @param packet Packet
* @param context ChannelContext
* @throws Exception Exception
public void handler(Packet packet, ChannelContext context) throws Exception {
MqttMessage mqttMessage = (MqttMessage) packet;
// 1. 先判断 mqtt 消息解析是否正常
DecoderResult decoderResult = mqttMessage.decoderResult();
if (decoderResult.isFailure()) {
processFailure(context, mqttMessage);
MqttFixedHeader fixedHeader = mqttMessage.fixedHeader();
MqttMessageType messageType = fixedHeader.messageType();
log.debug("MqttMessageType:{}", messageType);
switch (messageType) {
processor.processConnect(context, (MqttConnectMessage) mqttMessage);
processor.processPublish(context, (MqttPublishMessage) mqttMessage);
case PUBACK:
processor.processPubAck(context, (MqttMessageIdVariableHeader) mqttMessage.variableHeader());
case PUBREC:
processor.processPubRec(context, (MqttMessageIdVariableHeader) mqttMessage.variableHeader());
case PUBREL:
processor.processPubRel(context, (MqttMessageIdVariableHeader) mqttMessage.variableHeader());
processor.processPubComp(context, (MqttMessageIdVariableHeader) mqttMessage.variableHeader());
processor.processSubscribe(context, (MqttSubscribeMessage) mqttMessage);
case SUBACK:
processor.processUnSubscribe(context, (MqttUnsubscribeMessage) mqttMessage);
* 处理失败
* @param context ChannelContext
* @param mqttMessage MqttMessage
private void processFailure(ChannelContext context, MqttMessage mqttMessage) {
Throwable cause = mqttMessage.decoderResult().getCause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage message = MqttMessageBuilders.connAck()
Tio.send(context, message);
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的 clientId
MqttConnAckMessage message = MqttMessageBuilders.connAck()
Tio.send(context, message);
} else {
log.error(cause.getMessage(), cause);
// 发送断开连接,是否强制关闭客户端连接???
Tio.send(context, MqttMessage.DISCONNECT);
package net.dreamlu.iot.mqtt.core.server;
import org.tio.core.ChannelContext;
import org.tio.core.DefaultAioListener;
* mqtt 服务监听
* @author L.cm
public class MqttServerAioListener extends DefaultAioListener {
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
return true;
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.*;
import org.tio.core.ChannelContext;
* mqtt broker 处理器
* @author L.cm
public interface MqttServerProcessor {
* 处理链接
* @param context ChannelContext
* @param message MqttConnectMessage
void processConnect(ChannelContext context, MqttConnectMessage message);
* Publish
* @param context ChannelContext
* @param message MqttPublishMessage
void processPublish(ChannelContext context, MqttPublishMessage message);
* PubAck
* @param context ChannelContext
* @param variableHeader MqttMessageIdVariableHeader
void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader);
* PubRec
* @param context ChannelContext
* @param variableHeader MqttMessageIdVariableHeader
void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader);
* PubRel
* @param context ChannelContext
* @param variableHeader MqttMessageIdVariableHeader
void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader);
* PubComp
* @param context ChannelContext
* @param variableHeader MqttMessageIdVariableHeader
void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader);
* 监听
* @param context ChannelContext
* @param message MqttSubscribeMessage
void processSubscribe(ChannelContext context, MqttSubscribeMessage message);
* 取消监听
* @param context ChannelContext
* @param message MqttUnsubscribeMessage
void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message);
* ping 消息处理
* @param context ChannelContext
void processPingReq(ChannelContext context);
* 断开连接
* @param context ChannelContext
void processDisConnect(ChannelContext context);
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.client.MqttClientProcessor;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
* 示例客户端处理
* @author L.cm
public class MqttClientProcessorImpl implements MqttClientProcessor {
public void processConAck(ChannelContext context, MqttConnAckMessage message) {
public void processSubAck(MqttSubAckMessage message) {
public void processPublish(ChannelContext context, MqttPublishMessage message) {
ByteBuffer byteBuffer = message.payload();
if (byteBuffer != null) {
public void processUnSubAck(MqttUnsubAckMessage message) {
public void processPubAck(MqttPubAckMessage message) {
public void processPubRec(ChannelContext context, MqttMessage message) {
public void processPubRel(ChannelContext context, MqttMessage message) {
public void processPubComp(MqttMessage message) {
package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioHandler;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioListener;
import net.dreamlu.iot.mqtt.core.client.MqttClientProcessor;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
* 客户端测试
* @author L.cm
public class MqttClientTest {
public static void main(String[] args) throws Exception {
MqttClientProcessor processor = new MqttClientProcessorImpl();
ClientAioHandler clientAioHandler = new MqttClientAioHandler(processor);
ClientAioListener clientAioListener = new MqttClientAioListener("MqttClientTest", "admin", "123456");
ReconnConf reconnConf = new ReconnConf();
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
TioClient tioClient = new TioClient(tioConfig);
ClientChannelContext context = tioClient.connect(new Node("", 1883), 1000);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
if (!context.isClosed) {
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader("testtopicxx", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
Tio.send(context, message);
}, 1000, 2000);
package net.dreamlu.iot.mqtt.example;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
* mqtt broker 处理器
* @author L.cm
public class MqttBrokerProcessorImpl implements MqttServerProcessor {
private static final String MQTT_CLIENT_ID_KEY = "mqttClientId";
public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
MqttConnectPayload payload = mqttMessage.payload();
String clientId = payload.clientIdentifier();
// 1. 客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
if (StrUtil.isBlank(clientId)) {
log.debug("CONNECT - clientId: {}", clientId);
// 2. 认证
String userName = payload.userName();
String password = payload.password();
boolean authResult = false;
if (false) {
MqttConnAckMessage message = MqttMessageBuilders.connAck()
Tio.send(context, message);
// 3. 设置 clientId
context.set(MQTT_CLIENT_ID_KEY, clientId);
// 4. 返回 ack
MqttMessage message = MqttMessageBuilders.connAck()
Tio.send(context, message);
private void refusedIdentifierRejected(ChannelContext context) {
MqttConnAckMessage message = MqttMessageBuilders.connAck()
Tio.send(context, message);
public void processPublish(ChannelContext context, MqttPublishMessage message) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PUBLISH - clientId: {}", clientId);
MqttFixedHeader fixedHeader = message.fixedHeader();
ByteBuffer payload = message.payload();
if (payload != null) {
public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PUBACK - clientId: {}, messageId: {}", clientId, messageId);
public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PUBREC - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
Tio.send(context, message);
public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PUBREL - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
Tio.send(context, message);
public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PUBCOMP - clientId: {}, messageId: {}", clientId, messageId);
public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
List<MqttTopicSubscription> topicSubscriptions = message.payload().topicSubscriptions();
List<Integer> mqttQoSList = topicSubscriptions.stream()
MqttMessage subAckMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttSubAckPayload(mqttQoSList));
Tio.send(context, subAckMessage);
public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage mqttMessage) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("UnSubscribe - clientId: {}", clientId);
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(mqttMessage.variableHeader().messageId()), null);
Tio.send(context, message);
public void processPingReq(ChannelContext context) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("PINGREQ - clientId: {}", clientId);
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0),
null, null);
Tio.send(context, message);
public void processDisConnect(ChannelContext context) {
String clientId = (String) context.get(MQTT_CLIENT_ID_KEY);
log.debug("DISCONNECT - clientId: {}", clientId);
package net.dreamlu.iot.mqtt.example;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.server.MqttServerAioHandler;
import net.dreamlu.iot.mqtt.core.server.MqttServerAioListener;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.lock.SetWithLock;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
* mqtt 服务端测试
* @author L.cm
public class MqttServerTest {
public static void main(String[] args) throws IOException {
int socketPort = 1883;
MqttServerProcessor brokerHandler = new MqttBrokerProcessorImpl();
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(brokerHandler);
// 监听
ServerAioListener listener = new MqttServerAioListener();
// 配置
ServerTioConfig config = new ServerTioConfig("mqtt-server", handler, listener);
TioServer tioServer = new TioServer(config);
// 设置timeout
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
SetWithLock<ChannelContext> contextSet = Tio.getAll(config);
Set<ChannelContext> channelContexts = contextSet.getObj();
channelContexts.forEach(context -> {
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader("testtopic", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
Tio.send(context, message);
}, 1000, 2000);
// 启动
tioServer.start("", socketPort);
log4j.rootLogger = DEBUG
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d %-5p %c{2} - %m%n
rootLogger.level = debug
rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.log.ref = log
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<description>Mica mqtt tools.</description>
