diff --git a/README.md b/README.md index 374792a9100b3f89548ca477e77109206e0d3108..84622c5c765bb3e908331b43cd5b6ebbc53f4e4a 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,19 @@ # mica mqtt 组件 -基于 `t-io` 实现的 `mqtt` iot组件。 +基于 `t-io` 实现的 `mqtt` 物联网组件。 ## 使用 -目前仅仅是试验性质,不过 `t-io` 确实很稳,`mica-mqtt-example` 中有 mqtt 服务端和客户端测试代码,main 方法运行即可。 +目前仅仅是试验性质,不过 `t-io` 确实很稳,`mica-mqtt-example` 中有 `mqtt` 服务端和客户端测试代码, `main` 方法运行即可。 + +## 功能 +- [x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。 +- [x] MQTT client 客户端(演示)。 +- [x] MQTT server 服务端(演示)。 +- [ ] MQTT 客户端 topic sub,添加阿里云 mqtt demo。 +- [ ] MQTT 服务端接续完善,精力有限,周期可能会长一些。 ## 文档 - [mqtt 协议文档](https://github.com/mcxiaoke/mqtt) -## TODO -- 继续抽象,方便使用。 -- 实现 `mqtt-broker` 功能。 - -## 说明 -由于精力有限,今年都没太多时间来折腾该项目。 - -另外可以参考 zbus 来处理消息或者实现内部的集群。 - ## 参考vs借鉴 - [netty codec-mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt) - [jmqtt](https://github.com/Cicizz/jmqtt) diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttDecoder.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttDecoder.java index aa701cf3d3a4b5cf1129570e2836c6c8b98afc83..0ce66a0dbe9048f56911478729d29bcf3dcfdc46 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttDecoder.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttDecoder.java @@ -367,7 +367,7 @@ public final class MqttDecoder { case SUBSCRIBE: return decodeSubscribePayload(buffer, bytesRemainingInVariablePart); case SUBACK: - return decodeSubackPayload(buffer, bytesRemainingInVariablePart); + return decodeSubAckPayload(buffer, bytesRemainingInVariablePart); case UNSUBSCRIBE: return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart); case UNSUBACK: @@ -458,7 +458,7 @@ public final class MqttDecoder { return new Result<>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); } - private static Result decodeSubackPayload( + private static Result decodeSubAckPayload( ByteBuffer buffer, int bytesRemainingInVariablePart) { final List grantedQos = new ArrayList<>(bytesRemainingInVariablePart); diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java index cb12d510106b6801b16ee77e505080db67f1e73a..143f287227d86231f32e517d74354da664c2577a 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java @@ -248,7 +248,7 @@ public final class MqttProperties { if (!first) { builder.append(", "); } - builder.append(pair.key + "->" + pair.value); + builder.append(pair.key).append("->").append(pair.value); first = false; } builder.append(")"); @@ -315,7 +315,7 @@ public final class MqttProperties { } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { List subscriptionIds = this.subscriptionIds; if (subscriptionIds == null) { - subscriptionIds = new ArrayList(1); + subscriptionIds = new ArrayList<>(1); this.subscriptionIds = subscriptionIds; } if (property instanceof IntegerProperty) { @@ -335,7 +335,7 @@ public final class MqttProperties { public Collection listAll() { Map props = this.props; if (props == null && subscriptionIds == null && userProperties == null) { - return Collections.emptyList(); + return Collections.emptyList(); } if (subscriptionIds == null && userProperties == null) { return props.values(); @@ -343,7 +343,7 @@ public final class MqttProperties { if (props == null && userProperties == null) { return subscriptionIds; } - List propValues = new ArrayList(props != null ? props.size() : 1); + List propValues = new ArrayList<>(props != null ? props.size() : 1); if (props != null) { propValues.addAll(props.values()); } @@ -398,14 +398,14 @@ public final class MqttProperties { */ public List getProperties(int propertyId) { if (propertyId == MqttPropertyType.USER_PROPERTY.value) { - return userProperties == null ? Collections.emptyList() : userProperties; + return userProperties == null ? Collections.emptyList() : userProperties; } if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { - return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; + return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; } Map props = this.props; return (props == null || !props.containsKey(propertyId)) ? - Collections.emptyList() : + Collections.emptyList() : Collections.singletonList(props.get(propertyId)); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index 50ea1e211fcb372fe09c4451c70bd0cc432923c8..06da55682cd91931524eedbeafb3e08ac854576d 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -23,6 +23,7 @@ import org.tio.core.ChannelContext; import org.tio.core.Tio; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; /** * 默认的 mqtt 消息处理器 @@ -31,11 +32,19 @@ import java.nio.ByteBuffer; */ public class DefaultMqttClientProcessor implements MqttClientProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class); - protected static final String MQTT_CONNECTED_KEY = "__MQTT_CONNECTED_KEY__"; private final MqttClientSubManage subManage; + private final CountDownLatch connLatch; - public DefaultMqttClientProcessor(MqttClientSubManage subManage) { + public DefaultMqttClientProcessor(MqttClientSubManage subManage, + CountDownLatch connLatch) { this.subManage = subManage; + this.connLatch = connLatch; + } + + @Override + public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) { + // 客户端失败,默认记录异常日志 + logger.error(ex.getMessage(), ex); } @Override @@ -43,8 +52,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor { MqttConnectReturnCode returnCode = message.variableHeader().connectReturnCode(); switch (message.variableHeader().connectReturnCode()) { case CONNECTION_ACCEPTED: - // 标记为链接成功,只有链接成功之后才能 sub 和 pub - context.set(MQTT_CONNECTED_KEY, Boolean.TRUE); + connLatch.countDown(); logger.info("MQTT 连接成功!"); break; case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: @@ -53,8 +61,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor { case CONNECTION_REFUSED_SERVER_UNAVAILABLE: case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: default: - Tio.close(context, "MqttClient connect error."); - context.setClosed(true); + Tio.close(context, "MqttClient connect error error ReturnCode:" + returnCode); throw new IllegalStateException("MqttClient connect error ReturnCode:" + returnCode); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index 295833b4da8c6f32b0f47506000bff6fbdfd2034..121ed061350c9e97c38ac727b2621222e6c8cdc9 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -58,6 +58,12 @@ public final class MqttClient { */ public MqttClient subQos0(String topicFilter, MqttMessageListener listener) { // TODO L.cm 对 topicFilter 校验 + MqttSubscribeMessage message = MqttMessageBuilders.subscribe() + .addSubscription(MqttQoS.AT_MOST_ONCE, topicFilter) + .messageId(MqttClientMessageId.getId()) + .build(); + Tio.send(context, message); + // 绑定 subManage listener return this; } @@ -140,9 +146,13 @@ public final class MqttClient { * @return 是否发送成功 */ public Boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) { - MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage( - new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0), - new MqttPublishVariableHeader(topic, 0), payload); + MqttPublishMessage message = MqttMessageBuilders.publish() + .topicName(topic) + .payload(payload) + .qos(qos) + .retained(retain) + .messageId(MqttClientMessageId.getId()) + .build(); return Tio.send(context, message); } @@ -187,14 +197,4 @@ public final class MqttClient { return context; } - /** - * 判断 mqtt 是否链接成功,仅仅在链接成功之后才能 sub 和 pub - * - * @return 是否成功 - */ - public boolean isConnected() { - Boolean connected = (Boolean) context.get(DefaultMqttClientProcessor.MQTT_CONNECTED_KEY); - return Boolean.TRUE.equals(connected); - } - } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioHandler.java index e3a836ddbeda3a0e9e25eab9c54bfd357f40e542..49becdcc79804f52513faf7340b437f857a5a417 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioHandler.java @@ -17,14 +17,11 @@ package net.dreamlu.iot.mqtt.core.client; import net.dreamlu.iot.mqtt.codec.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.tio.client.intf.ClientAioHandler; import org.tio.core.ChannelContext; import org.tio.core.TioConfig; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; -import org.tio.server.AcceptCompletionHandler; import java.nio.ByteBuffer; @@ -34,7 +31,6 @@ import java.nio.ByteBuffer; * @author L.cm */ public class MqttClientAioHandler implements ClientAioHandler { - private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class); private final MqttDecoder mqttDecoder; private final MqttEncoder mqttEncoder; private final MqttClientProcessor processor; @@ -66,7 +62,7 @@ public class MqttClientAioHandler implements ClientAioHandler { // 1. 先判断 mqtt 消息解析是否正常 DecoderResult decoderResult = message.decoderResult(); if (decoderResult.isFailure()) { - processFailure(message); + processor.processDecodeFailure(context, message, decoderResult.getCause()); return; } MqttFixedHeader fixedHeader = message.fixedHeader(); @@ -102,15 +98,4 @@ public class MqttClientAioHandler implements ClientAioHandler { } } - /** - * 处理失败 - * - * @param mqttMessage MqttMessage - */ - private void processFailure(MqttMessage mqttMessage) { - // 客户端失败,我认为日志记录异常就行了 - Throwable cause = mqttMessage.decoderResult().getCause(); - log.error(cause.getMessage(), cause); - } - } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java index f2f660f4dbc1d995e01fd322ded65ee3c8dcd1ef..29dbab24524e40066e1c8eca73ff0681818f317d 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java @@ -17,6 +17,7 @@ package net.dreamlu.iot.mqtt.core.client; import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders; +import net.dreamlu.iot.mqtt.codec.MqttProperties; import org.tio.client.DefaultClientAioListener; import org.tio.core.ChannelContext; import org.tio.core.Tio; @@ -60,7 +61,13 @@ public class MqttClientAioListener extends DefaultClientAioListener { builder.willTopic(willMessage.getTopic()) .willMessage(willMessage.getMessage()) .willRetain(willMessage.isRetain()) - .willQoS(willMessage.getQos()); + .willQoS(willMessage.getQos()) + .willProperties(willMessage.getWillProperties()); + } + // 4. mqtt5 properties + MqttProperties properties = clientConfig.getProperties(); + if (properties != null) { + builder.properties(properties); } // 4. 发送链接请求 Tio.send(context, builder.build()); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java index 943c6f53acffd80d9ed605684f90bb728f554d05..6fc9347b51b220e8f19134b4e14b91bda0b9d094 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java @@ -16,6 +16,7 @@ package net.dreamlu.iot.mqtt.core.client; +import net.dreamlu.iot.mqtt.codec.MqttProperties; import net.dreamlu.iot.mqtt.codec.MqttVersion; import org.tio.client.ClientChannelContext; import org.tio.client.ClientTioConfig; @@ -28,6 +29,7 @@ import org.tio.core.ssl.SslConfig; import org.tio.utils.hutool.StrUtil; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; /** @@ -93,6 +95,10 @@ public final class MqttClientCreator { * 遗嘱消息 */ private MqttWillMessage willMessage; + /** + * mqtt5 properties + */ + private MqttProperties properties; protected String getIp() { return ip; @@ -146,6 +152,10 @@ public final class MqttClientCreator { return willMessage; } + public MqttProperties getProperties() { + return properties; + } + public MqttClientCreator ip(String ip) { this.ip = ip; return this; @@ -217,8 +227,10 @@ public final class MqttClientCreator { return willMessage(builder.build()); } - - + public MqttClientCreator properties(MqttProperties properties) { + this.properties = properties; + return this; + } public MqttClient connect() throws Exception { // 1. 生成 默认的 clientId @@ -229,7 +241,8 @@ public final class MqttClientCreator { } MqttClientSubManage subManage = new MqttClientSubManage(); // 客户端处理器 - MqttClientProcessor processor = new DefaultMqttClientProcessor(subManage); + CountDownLatch connLatch = new CountDownLatch(1); + MqttClientProcessor processor = new DefaultMqttClientProcessor(subManage, connLatch); // 2. 初始化 mqtt 处理器 ClientAioHandler clientAioHandler = new MqttClientAioHandler(Objects.requireNonNull(processor)); ClientAioListener clientAioListener = new MqttClientAioListener(this); @@ -245,6 +258,7 @@ public final class MqttClientCreator { // 4. tioClient TioClient tioClient = new TioClient(new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf)); ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout); + connLatch.await(); return new MqttClient(tioClient, this, context, subManage); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientMessageId.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientMessageId.java index bb090fd80c9ec37180ade25fe86ff0f01cfa9918..ffd75a62edbe83fa211dde76b75468cfc2141813 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientMessageId.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientMessageId.java @@ -37,13 +37,17 @@ public enum MqttClientMessageId { this.value = value; } - public MqttMessageIdVariableHeader getMessageId() { + public int getMessageId() { this.value.compareAndSet(0xffff, 1); - return MqttMessageIdVariableHeader.from(this.value.getAndIncrement()); + return this.value.getAndIncrement(); } - public static MqttMessageIdVariableHeader getId() { + public static int getId() { return INSTANCE.getMessageId(); } + public static MqttMessageIdVariableHeader getVariableHeader() { + return MqttMessageIdVariableHeader.from(getId()); + } + } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientProcessor.java index 6a063536497edb5c5675fb0b7cd9d32e41564cf6..6036fd834cf65bdcb7b16737a1663d750fcb5695 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientProcessor.java @@ -26,6 +26,15 @@ import org.tio.core.ChannelContext; */ public interface MqttClientProcessor { + /** + * 处理编解码失败 + * + * @param context ChannelContext + * @param message MqttMessage + * @param ex Throwable + */ + void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex); + /** * 处理服务端链接 ack * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java index 804f4b25dfd66e161fc196c5b3e667da49f540a8..e184660a7ba3cd8a06e2105aaa13e7819ab7cfb1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java @@ -16,6 +16,7 @@ package net.dreamlu.iot.mqtt.core.client; +import net.dreamlu.iot.mqtt.codec.MqttProperties; import net.dreamlu.iot.mqtt.codec.MqttQoS; import java.nio.charset.StandardCharsets; @@ -38,12 +39,17 @@ public final class MqttWillMessage { * 如果遗嘱标志被设置为 false,遗嘱 QoS 也必须设置为 0。 如果遗嘱标志被设置为 true,遗嘱 QoS 的值可以等于 0,1,2。 */ private final MqttQoS qos; + /** + * mqtt5 willProperties + */ + private final MqttProperties willProperties; - private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos) { + private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos, MqttProperties willProperties) { this.topic = topic; this.message = message; this.retain = retain; this.qos = qos; + this.willProperties = willProperties; } public String getTopic() { @@ -62,6 +68,10 @@ public final class MqttWillMessage { return qos; } + public MqttProperties getWillProperties() { + return willProperties; + } + public static MqttWillMessage.Builder builder() { return new MqttWillMessage.Builder(); } @@ -71,6 +81,7 @@ public final class MqttWillMessage { private byte[] message; private boolean retain; private MqttQoS qos; + private MqttProperties willProperties; public Builder topic(String topic) { this.topic = Objects.requireNonNull(topic); @@ -97,8 +108,13 @@ public final class MqttWillMessage { return this; } + public Builder willProperties(MqttProperties willProperties) { + this.willProperties = Objects.requireNonNull(willProperties); + return this; + } + public MqttWillMessage build() { - return new MqttWillMessage(this.topic, this.message, this.retain, this.qos); + return new MqttWillMessage(this.topic, this.message, this.retain, this.qos, this.willProperties); } } diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d889e4cd7698d6934378d9a0a5c67cec8003a438 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java @@ -0,0 +1,44 @@ +package net.dreamlu.iot.mqtt.aliyun; + +import net.dreamlu.iot.mqtt.core.client.MqttClient; + +import java.nio.ByteBuffer; + +/** + * 客户端测试 + * + * @author L.cm + */ +public class MqttClientTest { + + public static void main(String[] args) throws Exception { + String productKey = "g27jB42P9hm"; + String deviceName = "3dbc1cb4"; + String deviceSecret = ""; + // 计算MQTT连接参数。 + MqttSign sign = new MqttSign(productKey, deviceName, deviceSecret); + + String username = sign.getUsername(); + String password = sign.getPassword(); + String clientId = sign.getClientId(); + System.out.println("username: " + username); + System.out.println("password: " + password); + System.out.println("clientid: " + clientId); + + // 初始化 mqtt 客户端 + MqttClient client = MqttClient.create() + .ip(productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com") + .port(443) + .username(username) + .password(password) + .clientId(clientId) + .connect(); + + client.subQos0("/sys/g27jB42P9hm/3dbc1cb4/thing/event/property/post_reply", (topic, payload) -> { + + }); + + String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}"; + client.publish("/sys/g27jB42P9hm/" + deviceName + "/thing/event/property/post", ByteBuffer.wrap(content.getBytes())); + } +} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttSign.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttSign.java new file mode 100644 index 0000000000000000000000000000000000000000..febb2e95a577ce001922006c68473534a39c054f --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttSign.java @@ -0,0 +1,76 @@ +package net.dreamlu.iot.mqtt.aliyun; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import java.math.BigInteger; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.Objects; + +/** + * 阿里云 mqtt 签名方式 + * + * @author L.cm + */ +public class MqttSign { + /** + * 用户名 + */ + private final String username; + /** + * 密码 + */ + private final String password; + /** + * 客户端id + */ + private final String clientId; + + public MqttSign(String productKey, String deviceName, String deviceSecret) { + Objects.requireNonNull(productKey, "productKey is null"); + Objects.requireNonNull(deviceName, "deviceName is null"); + Objects.requireNonNull(deviceSecret, "deviceSecret is null"); + this.username = deviceName + '&' + productKey; + String timestamp = Long.toString(System.currentTimeMillis()); + this.password = getPassword(productKey, deviceName, deviceSecret, timestamp); + this.clientId = getClientId(productKey, deviceName, timestamp); + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getClientId() { + return clientId; + } + + private static String getPassword(String productKey, String deviceName, String deviceSecret, String timestamp) { + String plainPasswd = "clientId" + productKey + '.' + deviceName + "deviceName" + + deviceName + "productKey" + productKey + "timestamp" + timestamp; + return hmacSha256(plainPasswd, deviceSecret); + } + + private static String getClientId(String productKey, String deviceName, String timestamp) { + return productKey + '.' + deviceName + '|' + "timestamp=" + timestamp + ",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|"; + } + + private static String hmacSha256(String plainText, String key) { + if (plainText == null || key == null) { + return null; + } + try { + Mac mac = Mac.getInstance("HmacSHA256"); + SecretKeySpec secretKeySpec = new SecretKeySpec(key.getBytes(), "HmacSHA256"); + mac.init(secretKeySpec); + byte[] hmacResult = mac.doFinal(plainText.getBytes()); + return String.format("%064x", new BigInteger(1, hmacResult)); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java index ee7eacc66520764e4e7f3853a6c91f26a6adf44c..a72925a00fec6bcf71d2e706e251e16c205785bf 100644 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java @@ -1,7 +1,7 @@ package net.dreamlu.iot.mqtt.client; +import net.dreamlu.iot.mqtt.codec.MqttVersion; import net.dreamlu.iot.mqtt.core.client.MqttClient; -import org.tio.client.ClientChannelContext; import java.nio.ByteBuffer; import java.util.Timer; @@ -15,38 +15,20 @@ import java.util.TimerTask; public class MqttClientTest { public static void main(String[] args) throws Exception { - String productKey = "g27jB42P9hm"; - String deviceName = "3dbc1cb4"; - String deviceSecret = "87b337020a99ddb9eab5bb68f8b2f891"; - - // 计算MQTT连接参数。 - MqttSign sign = new MqttSign(); - sign.calculate(productKey, deviceName, deviceSecret); - - String username = sign.getUsername(); - String password = sign.getPassword(); - String clientId = sign.getClientid(); - System.out.println("username: " + username); - System.out.println("password: " + password); - System.out.println("clientid: " + clientId); - // 初始化 mqtt 客户端 MqttClient client = MqttClient.create() - .ip(productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com") - .port(443) - .username(username) - .password(password) - .clientId(clientId) + .ip("127.0.0.1") + .username("admin") + .password("123456") + .protocolVersion(MqttVersion.MQTT_5) .connect(); - Thread.sleep(1000L); -// Timer timer = new Timer(); -// timer.schedule(new TimerTask() { -// @Override -// public void run() { - String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}"; - client.publish("/sys/g27jB42P9hm/" + deviceName + "/thing/event/property/post", ByteBuffer.wrap(content.getBytes())); -// } -// }, 1000, 2000); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + client.publish("testtopicxx", ByteBuffer.wrap("mica最牛皮".getBytes())); + } + }, 1000, 2000); } } diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttSign.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttSign.java deleted file mode 100644 index 315c8a4a1c655400b4d155600e50916d54be6386..0000000000000000000000000000000000000000 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttSign.java +++ /dev/null @@ -1,70 +0,0 @@ -package net.dreamlu.iot.mqtt.client; - -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import java.math.BigInteger; - -class CryptoUtil { - private static String hmac(String plainText, String key, String algorithm, String format) throws Exception { - if (plainText == null || key == null) { - return null; - } - - byte[] hmacResult = null; - - Mac mac = Mac.getInstance(algorithm); - SecretKeySpec secretKeySpec = new SecretKeySpec(key.getBytes(), algorithm); - mac.init(secretKeySpec); - hmacResult = mac.doFinal(plainText.getBytes()); - return String.format(format, new BigInteger(1, hmacResult)); - } - -// public static String hmacMd5(String plainText, String key) throws Exception { -// return hmac(plainText,key,"HmacMD5","%032x"); -// } -// -// public static String hmacSha1(String plainText, String key) throws Exception { -// return hmac(plainText,key,"HmacSHA1","%040x"); -// } - - public static String hmacSha256(String plainText, String key) throws Exception { - return hmac(plainText,key,"HmacSHA256","%064x"); - } -} - -public class MqttSign { - private String username = ""; - - private String password = ""; - - private String clientid = ""; - - public String getUsername() { return this.username;} - - public String getPassword() { return this.password;} - - public String getClientid() { return this.clientid;} - - public void calculate(String productKey, String deviceName, String deviceSecret) { - if (productKey == null || deviceName == null || deviceSecret == null) { - return; - } - - try { - //MQTT用户名 - this.username = deviceName + "&" + productKey; - - //MQTT密码 - String timestamp = Long.toString(System.currentTimeMillis()); - String plainPasswd = "clientId" + productKey + "." + deviceName + "deviceName" + - deviceName + "productKey" + productKey + "timestamp" + timestamp; - this.password = CryptoUtil.hmacSha256(plainPasswd, deviceSecret); - - //MQTT ClientId - this.clientid = productKey + "." + deviceName + "|" + "timestamp=" + timestamp + - ",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|"; - }catch (Exception e) { - e.printStackTrace(); - } - } -}