提交 25ba6fab 编写于 作者: C chengxiangwang

optimize logic of push message to consumers;add MQTTSession(extends Client)

上级 f8da6ffa
...@@ -41,7 +41,17 @@ public class Client { ...@@ -41,7 +41,17 @@ public class Client {
private String snodeAddress; private String snodeAddress;
public Client() {
}
public Client(String clientId, ClientRole clientRole, Set<String> groups, RemotingChannel remotingChannel,
long lastUpdateTimestamp) {
this.clientId = clientId;
this.clientRole = clientRole;
this.groups = groups;
this.remotingChannel = remotingChannel;
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
public ClientRole getClientRole() { public ClientRole getClientRole() {
return clientRole; return clientRole;
......
...@@ -17,16 +17,15 @@ ...@@ -17,16 +17,15 @@
package org.apache.rocketmq.mqtt.client; package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
public class InFlightMessage { public class InFlightMessage {
final String topic; final String topic;
final MqttQoS publishingQos; final Integer pushQos;
final ByteBuf payload; final ByteBuf payload;
InFlightMessage(String topic, MqttQoS publishingQos, ByteBuf payload) { InFlightMessage(String topic, Integer pushQos, ByteBuf payload) {
this.topic = topic; this.topic = topic;
this.publishingQos = publishingQos; this.pushQos = pushQos;
this.payload = payload; this.payload = payload;
} }
} }
...@@ -17,21 +17,20 @@ ...@@ -17,21 +17,20 @@
package org.apache.rocketmq.mqtt.client; package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.Set;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.mqtt.exception.MqttRuntimeException; import org.apache.rocketmq.mqtt.exception.MqttRuntimeException;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MQTTSession extends Client { public class MQTTSession extends Client {
...@@ -41,6 +40,7 @@ public class MQTTSession extends Client { ...@@ -41,6 +40,7 @@ public class MQTTSession extends Client {
private final AtomicInteger inflightSlots = new AtomicInteger(10); private final AtomicInteger inflightSlots = new AtomicInteger(10);
private final Map<Integer, InFlightMessage> inflightWindow = new HashMap<>(); private final Map<Integer, InFlightMessage> inflightWindow = new HashMap<>();
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>(); private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
private final AtomicInteger lastPacketId = new AtomicInteger(0); private final AtomicInteger lastPacketId = new AtomicInteger(0);
private Hashtable inUsePacketIds = new Hashtable(); private Hashtable inUsePacketIds = new Hashtable();
private int nextPacketId = 0; private int nextPacketId = 0;
...@@ -74,6 +74,13 @@ public class MQTTSession extends Client { ...@@ -74,6 +74,13 @@ public class MQTTSession extends Client {
} }
} }
public MQTTSession(String clientId, ClientRole clientRole, Set<String> groups, boolean isConnected, boolean cleanSession,
RemotingChannel remotingChannel, long lastUpdateTimestamp) {
super(clientId, clientRole, groups, remotingChannel, lastUpdateTimestamp);
this.isConnected = isConnected;
this.cleanSession = cleanSession;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
...@@ -114,21 +121,17 @@ public class MQTTSession extends Client { ...@@ -114,21 +121,17 @@ public class MQTTSession extends Client {
DefaultMqttMessageProcessor defaultMqttMessageProcessor) { DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
if (mqttHeader.getQosLevel() > 0) { if (mqttHeader.getQosLevel() > 0) {
// IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
// ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
// Subscription subscription = clientId2Subscription.get(this.getClientId());
// Enumeration<String> topicFilters = subscription.getSubscriptionTable().keys();
// while (topicFilters.hasMoreElements()) {
// String topicFilter = topicFilters.nextElement();
// }
inflightSlots.decrementAndGet(); inflightSlots.decrementAndGet();
mqttHeader.setPacketId(getNextPacketId()); mqttHeader.setPacketId(getNextPacketId());
inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), )); inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), payload));
inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS));
} }
defaultMqttMessageProcessor.getMqttPushService().pushMessageQos(mqttHeader, payload, this); defaultMqttMessageProcessor.getMqttPushService().pushMessageQos(mqttHeader, payload, this);
} }
public void pubAckReceived(int ackPacketId) {
inflightWindow.remove(ackPacketId);
inflightSlots.incrementAndGet();
}
private synchronized void releasePacketId(int msgId) { private synchronized void releasePacketId(int msgId) {
this.inUsePacketIds.remove(new Integer(msgId)); this.inUsePacketIds.remove(new Integer(msgId));
} }
......
...@@ -26,5 +26,6 @@ public class MqttConstant { ...@@ -26,5 +26,6 @@ public class MqttConstant {
public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_FLAG_SHARP = "#";
public static final String SUBSCRIPTION_SEPARATOR = "/"; public static final String SUBSCRIPTION_SEPARATOR = "/";
public static final long DEFAULT_TIMEOUT_MILLS = 3000L; public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS";
public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
} }
...@@ -117,17 +117,11 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -117,17 +117,11 @@ public class MqttConnectMessageHandler implements MessageHandler {
} }
} }
MQTTSession client = new MQTTSession(); MQTTSession client = new MQTTSession(payload.clientIdentifier(), ClientRole.IOTCLIENT, new HashSet<String>() {
client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT);
client.setGroups(new HashSet<String>() {
{ {
add("IOT_GROUP"); add("IOT_GROUP");
} }
}); }, true, mqttConnectMessage.variableHeader().isCleanSession(), remotingChannel, System.currentTimeMillis());
client.setConnected(true);
client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis());
//register remotingChannel<--->client //register remotingChannel<--->client
iotClientManager.register(IOTClientManagerImpl.IOT_GROUP, client); iotClientManager.register(IOTClientManagerImpl.IOT_GROUP, client);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册