提交 185ba562 编写于 作者: 浅梦2013's avatar 浅梦2013

🐛 修复 java8 下 NoSuchMethodError: java.nio.ByteBuffer.xxx gitee #I43EZT

上级 f5bc0d74
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.codec;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
......@@ -33,6 +34,7 @@ public class ByteBufferUtil {
/**
* read byte
*
* @param buffer ByteBuffer
* @return byte
*/
......@@ -42,6 +44,7 @@ public class ByteBufferUtil {
/**
* read unsigned byte
*
* @param buffer ByteBuffer
* @return short
*/
......@@ -51,15 +54,82 @@ public class ByteBufferUtil {
/**
* skip bytes
*
* @param buffer ByteBuffer
* @param skip skip bytes
* @param skip skip bytes
* @return ByteBuffer
*/
public static ByteBuffer skipBytes(ByteBuffer buffer, int skip) {
buffer.position(buffer.position() + skip);
position(buffer, buffer.position() + skip);
return buffer;
}
/**
* position 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
* @param newPosition newPosition
*/
public static void position(Buffer buffer, int newPosition) {
((Buffer) buffer).position(newPosition);
}
/**
* limit 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
* @param newLimit newLimit
*/
public static void limit(Buffer buffer, int newLimit) {
((Buffer) buffer).limit(newLimit);
}
/**
* flip 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
*/
public static void flip(Buffer buffer) {
((Buffer) buffer).flip();
}
/**
* rewind 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
*/
public static void rewind(Buffer buffer) {
((Buffer) buffer).rewind();
}
/**
* mark 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
*/
public static void mark(Buffer buffer) {
((Buffer) buffer).mark();
}
/**
* reset 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
*/
public static void reset(Buffer buffer) {
((Buffer) buffer).reset();
}
/**
* clear 为了兼容 java8,详见: https://gitee.com/596392912/mica-mqtt/issues/I43EZT
*
* @param buffer Buffer
*/
public static void clear(Buffer buffer) {
((Buffer) buffer).clear();
}
public static String toString(ByteBuffer buffer) {
return toString(buffer, StandardCharsets.UTF_8);
}
......@@ -71,10 +141,10 @@ public class ByteBufferUtil {
public static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.capacity());
// copy from the beginning
original.rewind();
rewind(original);
clone.put(original);
original.rewind();
clone.flip();
rewind(original);
flip(clone);
return clone;
}
......
......@@ -145,7 +145,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
}
pendingPublish.onPubAckReceived();
clientStore.removePendingPublish(messageId);
pendingPublish.getPayload().clear();
ByteBufferUtil.clear(pendingPublish.getPayload());
}
@Override
......@@ -185,7 +185,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
public void processPubComp(MqttMessage message) {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
MqttPendingPublish pendingPublish = clientStore.getPendingPublish(messageId);
pendingPublish.getPayload().clear();
if (pendingPublish == null) {
return;
}
ByteBufferUtil.clear(pendingPublish.getPayload());
pendingPublish.onPubCompReceived();
clientStore.removePendingPublish(messageId);
}
......@@ -201,7 +204,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
final ByteBuffer payload = message.payload();
subscriptionList.forEach(subscription -> {
IMqttClientMessageListener listener = subscription.getListener();
payload.rewind();
ByteBufferUtil.rewind(payload);
listener.onMessage(topicName, payload);
});
}
......
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
......@@ -10,6 +11,7 @@ import java.util.function.Consumer;
/**
* MqttPendingPublish,参考于 netty-mqtt-client
*
* @author netty
*/
public final class MqttPendingPublish {
......@@ -40,7 +42,7 @@ public final class MqttPendingPublish {
public void startPublishRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.pubRetryProcessor.setHandle(((fixedHeader, originalMessage) -> {
this.payload.rewind();
ByteBufferUtil.rewind(this.payload);
sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload));
}));
this.pubRetryProcessor.start(executor);
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
......@@ -147,7 +148,7 @@ public final class MqttServer {
private boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
payload.rewind();
ByteBufferUtil.rewind(payload);
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
......
......@@ -167,7 +167,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
}
pendingPublish.onPubAckReceived();
sessionManager.removePendingPublish(clientId, messageId);
pendingPublish.getPayload().clear();
ByteBufferUtil.clear(pendingPublish.getPayload());
}
@Override
......@@ -218,7 +218,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
logger.debug("PubComp - clientId:{}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
if (pendingPublish != null) {
pendingPublish.getPayload().clear();
ByteBufferUtil.clear(pendingPublish.getPayload());
pendingPublish.onPubCompReceived();
sessionManager.removePendingPublish(clientId, messageId);
}
......
......@@ -95,7 +95,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
return null;
}
// 重置 buffer
buffer.rewind();
ByteBufferUtil.rewind(buffer);
// 解析 mqtt 消息
MqttMessage mqttMessage = new MqttDecoder().decode(context, buffer, 0, 0, buffer.remaining());
if (mqttMessage == null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册