提交 49a374f5 编写于 作者: 浅梦2013's avatar 浅梦2013

代码优化。

上级 6ee3d65b
......@@ -76,7 +76,12 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
*/
@Override
public void onAfterHandshaked(HttpRequest request, HttpResponse response, ChannelContext context) {
// 在连接中添加 WriteBuffer 用来处理半包消息
WriteBuffer wsBody = (WriteBuffer) context.get(MQTT_WS_MSG_BODY_KEY);
if (wsBody == null) {
wsBody = new WriteBuffer();
context.set(MQTT_WS_MSG_BODY_KEY, wsBody);
}
}
/**
......@@ -84,26 +89,12 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
*/
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext context) {
WriteBuffer wsBody = (WriteBuffer) context.get(MQTT_WS_MSG_BODY_KEY);
if (wsBody == null) {
wsBody = new WriteBuffer();
context.set(MQTT_WS_MSG_BODY_KEY, wsBody);
}
wsBody.writeBytes(bytes);
byte[] array = wsBody.toArray();
int length = array.length;
if (length < 2) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(array);
int mqttLength = getMqttLength(buffer);
if (length < mqttLength + 2) {
ByteBuffer buffer = getMqttBody(bytes, context);
if (buffer == null) {
return null;
}
// 数据已经读取完毕
wsBody.reset();
buffer.rewind();
MqttMessage mqttMessage = new MqttDecoder().decode(context, buffer, 0, 0, length);
// 解析 mqtt 消息
MqttMessage mqttMessage = new MqttDecoder().decode(context, buffer, 0, 0, buffer.remaining());
if (mqttMessage == null) {
return null;
}
......@@ -188,6 +179,30 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
return null;
}
/**
* 读取 mqtt 消息体处理半包的情况
* @param bytes 消息类容
* @param context ChannelContext
* @return ByteBuffer
*/
private static synchronized ByteBuffer getMqttBody(byte[] bytes, ChannelContext context) {
WriteBuffer wsBody = (WriteBuffer) context.get(MQTT_WS_MSG_BODY_KEY);
wsBody.writeBytes(bytes);
int length = wsBody.size();
if (length < 2) {
return null;
}
ByteBuffer buffer = wsBody.toBuffer();
int mqttLength = getMqttLength(buffer) + 2;
if (length < mqttLength) {
return null;
}
// 数据已经读取完毕
wsBody.reset();
buffer.rewind();
return buffer;
}
/**
* 解析 mqtt 消息长度
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册