提交 e34683f6 编写于 作者: 如梦技术's avatar 如梦技术 🐛

测试阿里云 mqtt。

上级 0c8c6b3a
# mica mqtt 组件
基于 `t-io` 实现的 `mqtt` iot组件。
基于 `t-io` 实现的 `mqtt` 物联网组件。
## 使用
目前仅仅是试验性质,不过 `t-io` 确实很稳,`mica-mqtt-example` 中有 mqtt 服务端和客户端测试代码,main 方法运行即可。
目前仅仅是试验性质,不过 `t-io` 确实很稳,`mica-mqtt-example` 中有 `mqtt` 服务端和客户端测试代码, `main` 方法运行即可。
## 功能
- [x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
- [x] MQTT client 客户端(演示)。
- [x] MQTT server 服务端(演示)。
- [ ] MQTT 客户端 topic sub,添加阿里云 mqtt demo。
- [ ] MQTT 服务端接续完善,精力有限,周期可能会长一些。
## 文档
- [mqtt 协议文档](https://github.com/mcxiaoke/mqtt)
## TODO
- 继续抽象,方便使用。
- 实现 `mqtt-broker` 功能。
## 说明
由于精力有限,今年都没太多时间来折腾该项目。
另外可以参考 zbus 来处理消息或者实现内部的集群。
## 参考vs借鉴
- [netty codec-mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt)
- [jmqtt](https://github.com/Cicizz/jmqtt)
......
......@@ -367,7 +367,7 @@ public final class MqttDecoder {
case SUBSCRIBE:
return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
case SUBACK:
return decodeSubackPayload(buffer, bytesRemainingInVariablePart);
return decodeSubAckPayload(buffer, bytesRemainingInVariablePart);
case UNSUBSCRIBE:
return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
case UNSUBACK:
......@@ -458,7 +458,7 @@ public final class MqttDecoder {
return new Result<>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}
private static Result<MqttSubAckPayload> decodeSubackPayload(
private static Result<MqttSubAckPayload> decodeSubAckPayload(
ByteBuffer buffer,
int bytesRemainingInVariablePart) {
final List<Integer> grantedQos = new ArrayList<>(bytesRemainingInVariablePart);
......
......@@ -248,7 +248,7 @@ public final class MqttProperties {
if (!first) {
builder.append(", ");
}
builder.append(pair.key + "->" + pair.value);
builder.append(pair.key).append("->").append(pair.value);
first = false;
}
builder.append(")");
......@@ -315,7 +315,7 @@ public final class MqttProperties {
} else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
if (subscriptionIds == null) {
subscriptionIds = new ArrayList<IntegerProperty>(1);
subscriptionIds = new ArrayList<>(1);
this.subscriptionIds = subscriptionIds;
}
if (property instanceof IntegerProperty) {
......@@ -335,7 +335,7 @@ public final class MqttProperties {
public Collection<? extends MqttProperty> listAll() {
Map<Integer, MqttProperty> props = this.props;
if (props == null && subscriptionIds == null && userProperties == null) {
return Collections.<MqttProperty>emptyList();
return Collections.emptyList();
}
if (subscriptionIds == null && userProperties == null) {
return props.values();
......@@ -343,7 +343,7 @@ public final class MqttProperties {
if (props == null && userProperties == null) {
return subscriptionIds;
}
List<MqttProperty> propValues = new ArrayList<MqttProperty>(props != null ? props.size() : 1);
List<MqttProperty> propValues = new ArrayList<>(props != null ? props.size() : 1);
if (props != null) {
propValues.addAll(props.values());
}
......@@ -398,14 +398,14 @@ public final class MqttProperties {
*/
public List<? extends MqttProperty> getProperties(int propertyId) {
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
return userProperties == null ? Collections.<MqttProperty>emptyList() : userProperties;
return userProperties == null ? Collections.emptyList() : userProperties;
}
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
return subscriptionIds == null ? Collections.<MqttProperty>emptyList() : subscriptionIds;
return subscriptionIds == null ? Collections.emptyList() : subscriptionIds;
}
Map<Integer, MqttProperty> props = this.props;
return (props == null || !props.containsKey(propertyId)) ?
Collections.<MqttProperty>emptyList() :
Collections.emptyList() :
Collections.singletonList(props.get(propertyId));
}
}
......@@ -23,6 +23,7 @@ import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
/**
* 默认的 mqtt 消息处理器
......@@ -31,11 +32,19 @@ import java.nio.ByteBuffer;
*/
public class DefaultMqttClientProcessor implements MqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
protected static final String MQTT_CONNECTED_KEY = "__MQTT_CONNECTED_KEY__";
private final MqttClientSubManage subManage;
private final CountDownLatch connLatch;
public DefaultMqttClientProcessor(MqttClientSubManage subManage) {
public DefaultMqttClientProcessor(MqttClientSubManage subManage,
CountDownLatch connLatch) {
this.subManage = subManage;
this.connLatch = connLatch;
}
@Override
public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) {
// 客户端失败,默认记录异常日志
logger.error(ex.getMessage(), ex);
}
@Override
......@@ -43,8 +52,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
MqttConnectReturnCode returnCode = message.variableHeader().connectReturnCode();
switch (message.variableHeader().connectReturnCode()) {
case CONNECTION_ACCEPTED:
// 标记为链接成功,只有链接成功之后才能 sub 和 pub
context.set(MQTT_CONNECTED_KEY, Boolean.TRUE);
connLatch.countDown();
logger.info("MQTT 连接成功!");
break;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
......@@ -53,8 +61,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
default:
Tio.close(context, "MqttClient connect error.");
context.setClosed(true);
Tio.close(context, "MqttClient connect error error ReturnCode:" + returnCode);
throw new IllegalStateException("MqttClient connect error ReturnCode:" + returnCode);
}
}
......
......@@ -58,6 +58,12 @@ public final class MqttClient {
*/
public MqttClient subQos0(String topicFilter, MqttMessageListener listener) {
// TODO L.cm 对 topicFilter 校验
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(MqttQoS.AT_MOST_ONCE, topicFilter)
.messageId(MqttClientMessageId.getId())
.build();
Tio.send(context, message);
// 绑定 subManage listener
return this;
}
......@@ -140,9 +146,13 @@ public final class MqttClient {
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0),
new MqttPublishVariableHeader(topic, 0), payload);
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
.qos(qos)
.retained(retain)
.messageId(MqttClientMessageId.getId())
.build();
return Tio.send(context, message);
}
......@@ -187,14 +197,4 @@ public final class MqttClient {
return context;
}
/**
* 判断 mqtt 是否链接成功,仅仅在链接成功之后才能 sub 和 pub
*
* @return 是否成功
*/
public boolean isConnected() {
Boolean connected = (Boolean) context.get(DefaultMqttClientProcessor.MQTT_CONNECTED_KEY);
return Boolean.TRUE.equals(connected);
}
}
......@@ -17,14 +17,11 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.AcceptCompletionHandler;
import java.nio.ByteBuffer;
......@@ -34,7 +31,6 @@ import java.nio.ByteBuffer;
* @author L.cm
*/
public class MqttClientAioHandler implements ClientAioHandler {
private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final MqttClientProcessor processor;
......@@ -66,7 +62,7 @@ public class MqttClientAioHandler implements ClientAioHandler {
// 1. 先判断 mqtt 消息解析是否正常
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure()) {
processFailure(message);
processor.processDecodeFailure(context, message, decoderResult.getCause());
return;
}
MqttFixedHeader fixedHeader = message.fixedHeader();
......@@ -102,15 +98,4 @@ public class MqttClientAioHandler implements ClientAioHandler {
}
}
/**
* 处理失败
*
* @param mqttMessage MqttMessage
*/
private void processFailure(MqttMessage mqttMessage) {
// 客户端失败,我认为日志记录异常就行了
Throwable cause = mqttMessage.decoderResult().getCause();
log.error(cause.getMessage(), cause);
}
}
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import org.tio.client.DefaultClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
......@@ -60,7 +61,13 @@ public class MqttClientAioListener extends DefaultClientAioListener {
builder.willTopic(willMessage.getTopic())
.willMessage(willMessage.getMessage())
.willRetain(willMessage.isRetain())
.willQoS(willMessage.getQos());
.willQoS(willMessage.getQos())
.willProperties(willMessage.getWillProperties());
}
// 4. mqtt5 properties
MqttProperties properties = clientConfig.getProperties();
if (properties != null) {
builder.properties(properties);
}
// 4. 发送链接请求
Tio.send(context, builder.build());
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
......@@ -28,6 +29,7 @@ import org.tio.core.ssl.SslConfig;
import org.tio.utils.hutool.StrUtil;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
/**
......@@ -93,6 +95,10 @@ public final class MqttClientCreator {
* 遗嘱消息
*/
private MqttWillMessage willMessage;
/**
* mqtt5 properties
*/
private MqttProperties properties;
protected String getIp() {
return ip;
......@@ -146,6 +152,10 @@ public final class MqttClientCreator {
return willMessage;
}
public MqttProperties getProperties() {
return properties;
}
public MqttClientCreator ip(String ip) {
this.ip = ip;
return this;
......@@ -217,8 +227,10 @@ public final class MqttClientCreator {
return willMessage(builder.build());
}
public MqttClientCreator properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttClient connect() throws Exception {
// 1. 生成 默认的 clientId
......@@ -229,7 +241,8 @@ public final class MqttClientCreator {
}
MqttClientSubManage subManage = new MqttClientSubManage();
// 客户端处理器
MqttClientProcessor processor = new DefaultMqttClientProcessor(subManage);
CountDownLatch connLatch = new CountDownLatch(1);
MqttClientProcessor processor = new DefaultMqttClientProcessor(subManage, connLatch);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(Objects.requireNonNull(processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this);
......@@ -245,6 +258,7 @@ public final class MqttClientCreator {
// 4. tioClient
TioClient tioClient = new TioClient(new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf));
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
connLatch.await();
return new MqttClient(tioClient, this, context, subManage);
}
......
......@@ -37,13 +37,17 @@ public enum MqttClientMessageId {
this.value = value;
}
public MqttMessageIdVariableHeader getMessageId() {
public int getMessageId() {
this.value.compareAndSet(0xffff, 1);
return MqttMessageIdVariableHeader.from(this.value.getAndIncrement());
return this.value.getAndIncrement();
}
public static MqttMessageIdVariableHeader getId() {
public static int getId() {
return INSTANCE.getMessageId();
}
public static MqttMessageIdVariableHeader getVariableHeader() {
return MqttMessageIdVariableHeader.from(getId());
}
}
......@@ -26,6 +26,15 @@ import org.tio.core.ChannelContext;
*/
public interface MqttClientProcessor {
/**
* 处理编解码失败
*
* @param context ChannelContext
* @param message MqttMessage
* @param ex Throwable
*/
void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex);
/**
* 处理服务端链接 ack
*
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import java.nio.charset.StandardCharsets;
......@@ -38,12 +39,17 @@ public final class MqttWillMessage {
* 如果遗嘱标志被设置为 false,遗嘱 QoS 也必须设置为 0。 如果遗嘱标志被设置为 true,遗嘱 QoS 的值可以等于 0,1,2。
*/
private final MqttQoS qos;
/**
* mqtt5 willProperties
*/
private final MqttProperties willProperties;
private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos) {
private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos, MqttProperties willProperties) {
this.topic = topic;
this.message = message;
this.retain = retain;
this.qos = qos;
this.willProperties = willProperties;
}
public String getTopic() {
......@@ -62,6 +68,10 @@ public final class MqttWillMessage {
return qos;
}
public MqttProperties getWillProperties() {
return willProperties;
}
public static MqttWillMessage.Builder builder() {
return new MqttWillMessage.Builder();
}
......@@ -71,6 +81,7 @@ public final class MqttWillMessage {
private byte[] message;
private boolean retain;
private MqttQoS qos;
private MqttProperties willProperties;
public Builder topic(String topic) {
this.topic = Objects.requireNonNull(topic);
......@@ -97,8 +108,13 @@ public final class MqttWillMessage {
return this;
}
public Builder willProperties(MqttProperties willProperties) {
this.willProperties = Objects.requireNonNull(willProperties);
return this;
}
public MqttWillMessage build() {
return new MqttWillMessage(this.topic, this.message, this.retain, this.qos);
return new MqttWillMessage(this.topic, this.message, this.retain, this.qos, this.willProperties);
}
}
......
package net.dreamlu.iot.mqtt.aliyun;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import java.nio.ByteBuffer;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientTest {
public static void main(String[] args) throws Exception {
String productKey = "g27jB42P9hm";
String deviceName = "3dbc1cb4";
String deviceSecret = "";
// 计算MQTT连接参数。
MqttSign sign = new MqttSign(productKey, deviceName, deviceSecret);
String username = sign.getUsername();
String password = sign.getPassword();
String clientId = sign.getClientId();
System.out.println("username: " + username);
System.out.println("password: " + password);
System.out.println("clientid: " + clientId);
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip(productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com")
.port(443)
.username(username)
.password(password)
.clientId(clientId)
.connect();
client.subQos0("/sys/g27jB42P9hm/3dbc1cb4/thing/event/property/post_reply", (topic, payload) -> {
});
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}";
client.publish("/sys/g27jB42P9hm/" + deviceName + "/thing/event/property/post", ByteBuffer.wrap(content.getBytes()));
}
}
package net.dreamlu.iot.mqtt.aliyun;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.math.BigInteger;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Objects;
/**
* 阿里云 mqtt 签名方式
*
* @author L.cm
*/
public class MqttSign {
/**
* 用户名
*/
private final String username;
/**
* 密码
*/
private final String password;
/**
* 客户端id
*/
private final String clientId;
public MqttSign(String productKey, String deviceName, String deviceSecret) {
Objects.requireNonNull(productKey, "productKey is null");
Objects.requireNonNull(deviceName, "deviceName is null");
Objects.requireNonNull(deviceSecret, "deviceSecret is null");
this.username = deviceName + '&' + productKey;
String timestamp = Long.toString(System.currentTimeMillis());
this.password = getPassword(productKey, deviceName, deviceSecret, timestamp);
this.clientId = getClientId(productKey, deviceName, timestamp);
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getClientId() {
return clientId;
}
private static String getPassword(String productKey, String deviceName, String deviceSecret, String timestamp) {
String plainPasswd = "clientId" + productKey + '.' + deviceName + "deviceName" +
deviceName + "productKey" + productKey + "timestamp" + timestamp;
return hmacSha256(plainPasswd, deviceSecret);
}
private static String getClientId(String productKey, String deviceName, String timestamp) {
return productKey + '.' + deviceName + '|' + "timestamp=" + timestamp + ",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|";
}
private static String hmacSha256(String plainText, String key) {
if (plainText == null || key == null) {
return null;
}
try {
Mac mac = Mac.getInstance("HmacSHA256");
SecretKeySpec secretKeySpec = new SecretKeySpec(key.getBytes(), "HmacSHA256");
mac.init(secretKeySpec);
byte[] hmacResult = mac.doFinal(plainText.getBytes());
return String.format("%064x", new BigInteger(1, hmacResult));
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}
}
package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.tio.client.ClientChannelContext;
import java.nio.ByteBuffer;
import java.util.Timer;
......@@ -15,38 +15,20 @@ import java.util.TimerTask;
public class MqttClientTest {
public static void main(String[] args) throws Exception {
String productKey = "g27jB42P9hm";
String deviceName = "3dbc1cb4";
String deviceSecret = "87b337020a99ddb9eab5bb68f8b2f891";
// 计算MQTT连接参数。
MqttSign sign = new MqttSign();
sign.calculate(productKey, deviceName, deviceSecret);
String username = sign.getUsername();
String password = sign.getPassword();
String clientId = sign.getClientid();
System.out.println("username: " + username);
System.out.println("password: " + password);
System.out.println("clientid: " + clientId);
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip(productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com")
.port(443)
.username(username)
.password(password)
.clientId(clientId)
.ip("127.0.0.1")
.username("admin")
.password("123456")
.protocolVersion(MqttVersion.MQTT_5)
.connect();
Thread.sleep(1000L);
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}";
client.publish("/sys/g27jB42P9hm/" + deviceName + "/thing/event/property/post", ByteBuffer.wrap(content.getBytes()));
// }
// }, 1000, 2000);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("testtopicxx", ByteBuffer.wrap("mica最牛皮".getBytes()));
}
}, 1000, 2000);
}
}
package net.dreamlu.iot.mqtt.client;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.math.BigInteger;
class CryptoUtil {
private static String hmac(String plainText, String key, String algorithm, String format) throws Exception {
if (plainText == null || key == null) {
return null;
}
byte[] hmacResult = null;
Mac mac = Mac.getInstance(algorithm);
SecretKeySpec secretKeySpec = new SecretKeySpec(key.getBytes(), algorithm);
mac.init(secretKeySpec);
hmacResult = mac.doFinal(plainText.getBytes());
return String.format(format, new BigInteger(1, hmacResult));
}
// public static String hmacMd5(String plainText, String key) throws Exception {
// return hmac(plainText,key,"HmacMD5","%032x");
// }
//
// public static String hmacSha1(String plainText, String key) throws Exception {
// return hmac(plainText,key,"HmacSHA1","%040x");
// }
public static String hmacSha256(String plainText, String key) throws Exception {
return hmac(plainText,key,"HmacSHA256","%064x");
}
}
public class MqttSign {
private String username = "";
private String password = "";
private String clientid = "";
public String getUsername() { return this.username;}
public String getPassword() { return this.password;}
public String getClientid() { return this.clientid;}
public void calculate(String productKey, String deviceName, String deviceSecret) {
if (productKey == null || deviceName == null || deviceSecret == null) {
return;
}
try {
//MQTT用户名
this.username = deviceName + "&" + productKey;
//MQTT密码
String timestamp = Long.toString(System.currentTimeMillis());
String plainPasswd = "clientId" + productKey + "." + deviceName + "deviceName" +
deviceName + "productKey" + productKey + "timestamp" + timestamp;
this.password = CryptoUtil.hmacSha256(plainPasswd, deviceSecret);
//MQTT ClientId
this.clientid = productKey + "." + deviceName + "|" + "timestamp=" + timestamp +
",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|";
}catch (Exception e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册