Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
b2169653
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
b2169653
编写于
3月 26, 2019
作者:
H
Heng Du
提交者:
GitHub
3月 26, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1116 from Aaron-He/mqtt
Implementation of handling MQTT client PINGREQ
上级
a3c17c66
36c21740
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
195 addition
and
3 deletion
+195
-3
mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
...etmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
+27
-1
mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
.../rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
+1
-0
mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
...g/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
+77
-0
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
...ing/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
+2
-2
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
...ng/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
+45
-0
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
...g/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
+43
-0
未找到文件。
mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
浏览文件 @
b2169653
...
...
@@ -17,14 +17,22 @@
package
org.apache.rocketmq.mqtt.mqtthandler.impl
;
import
io.netty.handler.codec.mqtt.MqttConnectMessage
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
io.netty.handler.codec.mqtt.MqttMessageType
;
import
io.netty.handler.codec.mqtt.MqttPubAckMessage
;
import
org.apache.rocketmq.common.client.Client
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.mqtt.client.IOTClientManagerImpl
;
import
org.apache.rocketmq.mqtt.exception.WrongMessageTypeException
;
import
org.apache.rocketmq.mqtt.mqtthandler.MessageHandler
;
import
org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.transport.mqtt.MqttHeader
;
public
class
MqttPingreqMessageHandler
implements
MessageHandler
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
MQTT_LOGGER_NAME
);
...
...
@@ -43,6 +51,24 @@ public class MqttPingreqMessageHandler implements MessageHandler {
*/
@Override
public
RemotingCommand
handleMessage
(
MqttMessage
message
,
RemotingChannel
remotingChannel
)
{
return
null
;
IOTClientManagerImpl
iotClientManager
=
(
IOTClientManagerImpl
)
defaultMqttMessageProcessor
.
getIotClientManager
();
Client
client
=
iotClientManager
.
getClient
(
IOTClientManagerImpl
.
IOT_GROUP
,
remotingChannel
);
log
.
debug
(
"Handle MQTT client: {} Pingreq."
,
client
.
getClientId
());
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
MqttHeader
.
class
);
if
(
client
!=
null
&&
client
.
isConnected
())
{
client
.
setLastUpdateTimestamp
(
System
.
currentTimeMillis
());
MqttHeader
mqttHeader
=
(
MqttHeader
)
response
.
readCustomHeader
();
mqttHeader
.
setMessageType
(
MqttMessageType
.
PINGRESP
.
value
());
mqttHeader
.
setDup
(
false
);
mqttHeader
.
setQosLevel
(
0
);
mqttHeader
.
setRetain
(
false
);
mqttHeader
.
setRemainingLength
(
0
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"MQTT Client is null or not connected"
);
return
response
;
}
}
mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
浏览文件 @
b2169653
...
...
@@ -132,6 +132,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
break
;
case
UNSUBSCRIBE:
case
PINGREQ:
break
;
case
DISCONNECT:
}
return
type2handler
.
get
(
MqttMessageType
.
valueOf
(
mqttHeader
.
getMessageType
())).
handleMessage
(
mqttMessage
,
remotingChannel
);
...
...
mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
0 → 100644
浏览文件 @
b2169653
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.mqtt
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
org.apache.rocketmq.common.client.Client
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.mqtt.client.IOTClientManagerImpl
;
import
org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler
;
import
org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
Mockito
.*;
import
static
org
.
junit
.
Assert
.*;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
MqttPingreqMessageHandlerTest
{
@Mock
private
RemotingChannel
remotingChannel
;
@Mock
private
IOTClientManagerImpl
iotClientManager
;
@Mock
private
MqttMessage
mqttMessage
;
@Mock
private
Client
client
;
@Mock
private
DefaultMqttMessageProcessor
processor
;
private
MqttPingreqMessageHandler
mqttPingreqMessageHandler
;
@Before
public
void
init
()
{
mqttPingreqMessageHandler
=
new
MqttPingreqMessageHandler
(
processor
);
when
(
processor
.
getIotClientManager
()).
thenReturn
(
iotClientManager
);
when
(
iotClientManager
.
getClient
(
IOTClientManagerImpl
.
IOT_GROUP
,
remotingChannel
)).
thenReturn
(
client
);
when
(
client
.
getClientId
()).
thenReturn
(
"Mock Client"
);
}
@Test
public
void
testHandlerMessageReturnResp
()
{
when
(
client
.
isConnected
()).
thenReturn
(
true
);
RemotingCommand
response
=
mqttPingreqMessageHandler
.
handleMessage
(
mqttMessage
,
remotingChannel
);
verify
(
client
).
setLastUpdateTimestamp
(
anyLong
());
assertEquals
(
ResponseCode
.
SUCCESS
,
response
.
getCode
());
}
@Test
public
void
testHandlerMessageReturnNull
()
{
when
(
client
.
isConnected
()).
thenReturn
(
false
);
RemotingCommand
response
=
mqttPingreqMessageHandler
.
handleMessage
(
mqttMessage
,
remotingChannel
);
assertEquals
(
ResponseCode
.
SYSTEM_ERROR
,
response
.
getCode
());
}
}
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
浏览文件 @
b2169653
...
...
@@ -38,8 +38,8 @@ public class EncodeDecodeDispatcher {
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
SUBACK
,
new
MqttSubackEncodeDecode
());
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
UNSUBSCRIBE
,
new
MqttUnSubscribeEncodeDecode
());
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
UNSUBACK
,
new
MqttUnSubackEncodeDecode
());
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
PINGREQ
,
n
ull
);
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
PINGRESP
,
n
ull
);
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
PINGREQ
,
n
ew
MqttPingReqEncodeDecode
()
);
encodeDecodeDispatcher
.
put
(
MqttMessageType
.
PINGRESP
,
n
ew
MqttPingRespEncodeDecode
()
);
}
public
static
Map
<
MqttMessageType
,
Message2MessageEncodeDecode
>
getEncodeDecodeDispatcher
()
{
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
0 → 100644
浏览文件 @
b2169653
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.remoting.transport.mqtt.dispatcher
;
import
io.netty.handler.codec.mqtt.MqttFixedHeader
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
java.io.UnsupportedEncodingException
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.transport.mqtt.MqttHeader
;
public
class
MqttPingReqEncodeDecode
implements
Message2MessageEncodeDecode
{
@Override
public
RemotingCommand
decode
(
MqttMessage
mqttMessage
)
{
MqttHeader
mqttHeader
=
new
MqttHeader
();
MqttFixedHeader
mqttFixedHeader
=
mqttMessage
.
fixedHeader
();
mqttHeader
.
setMessageType
(
mqttFixedHeader
.
messageType
().
value
());
mqttHeader
.
setDup
(
mqttFixedHeader
.
isDup
());
mqttHeader
.
setQosLevel
(
mqttFixedHeader
.
qosLevel
().
value
());
mqttHeader
.
setRetain
(
mqttFixedHeader
.
isRetain
());
mqttHeader
.
setRemainingLength
(
mqttFixedHeader
.
remainingLength
());
return
RemotingCommand
.
createRequestCommand
(
1000
,
mqttHeader
);
}
@Override
public
MqttMessage
encode
(
RemotingCommand
remotingCommand
)
throws
RemotingCommandException
,
UnsupportedEncodingException
{
return
null
;
}
}
remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
0 → 100644
浏览文件 @
b2169653
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.remoting.transport.mqtt.dispatcher
;
import
io.netty.handler.codec.mqtt.MqttFixedHeader
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
io.netty.handler.codec.mqtt.MqttMessageType
;
import
io.netty.handler.codec.mqtt.MqttQoS
;
import
java.io.UnsupportedEncodingException
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.transport.mqtt.MqttHeader
;
public
class
MqttPingRespEncodeDecode
implements
Message2MessageEncodeDecode
{
@Override
public
RemotingCommand
decode
(
MqttMessage
mqttMessage
)
{
return
null
;
}
@Override
public
MqttMessage
encode
(
RemotingCommand
remotingCommand
)
throws
RemotingCommandException
,
UnsupportedEncodingException
{
MqttHeader
mqttHeader
=
(
MqttHeader
)
remotingCommand
.
getCustomHeader
();
return
new
MqttMessage
(
new
MqttFixedHeader
(
MqttMessageType
.
PINGRESP
,
mqttHeader
.
isDup
(),
MqttQoS
.
valueOf
(
mqttHeader
.
getQosLevel
()),
mqttHeader
.
isRetain
(),
mqttHeader
.
getRemainingLength
()));
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录