提交 2f20770b 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client 优化

上级 f39bbf1c
......@@ -160,6 +160,10 @@ public final class MqttClientCreator {
* groupExecutor
*/
private ThreadPoolExecutor groupExecutor;
/**
* scheduledExecutor
*/
private ScheduledThreadPoolExecutor scheduledExecutor;
public String getName() {
return name;
......@@ -273,6 +277,10 @@ public final class MqttClientCreator {
return groupExecutor;
}
public ScheduledThreadPoolExecutor getScheduledExecutor() {
return scheduledExecutor;
}
public MqttClientCreator name(String name) {
this.name = name;
return this;
......@@ -423,6 +431,11 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator scheduledExecutor(ScheduledThreadPoolExecutor scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}
public MqttClient connect() {
// 1. 生成 默认的 clientId
String clientId = getClientId();
......@@ -446,11 +459,14 @@ public final class MqttClientCreator {
if (this.groupExecutor == null) {
this.groupExecutor = ThreadUtil.getGroupExecutor(2);
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
// scheduledExecutor
if (this.scheduledExecutor == null) {
this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttClient"));
}
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, this.scheduledExecutor);
// 4. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
ClientAioListener clientAioListener = new MqttClientAioListener(this, executor);
ClientAioListener clientAioListener = new MqttClientAioListener(this, this.scheduledExecutor);
// 5. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
......@@ -471,7 +487,7 @@ public final class MqttClientCreator {
try {
TioClient tioClient = new TioClient(tioConfig);
tioClient.asynConnect(new Node(this.ip, this.port), this.timeout);
return new MqttClient(tioClient, this, executor);
return new MqttClient(tioClient, this, this.scheduledExecutor);
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.client.listener;
import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;
import net.dreamlu.iot.mqtt.server.MqttClientTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
/**
* 客户端连接状态监听
*
* @author L.cm
*/
@Service
public class MqttClientConnectListener implements IMqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
if (isReconnect) {
logger.info("重连 mqtt 服务器重连成功...");
} else {
logger.info("连接 mqtt 服务器成功...");
}
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.error("mqtt 链接断开 remark:{} isRemove:{}", remark, isRemove, throwable);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.server.listener;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.server.MqttClientTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
/**
* mqtt 连接状态
*
* @author L.cm
*/
@Service
public class MqttConnectStatusListener implements IMqttConnectStatusListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
@Override
public void online(ChannelContext context, String clientId) {
String username = (String) context.get(MqttConst.USER_NAME_KEY);
logger.info("Mqtt clientId:{} username:{} online...", clientId, username);
}
@Override
public void offline(ChannelContext context, String clientId) {
String username = (String) context.get(MqttConst.USER_NAME_KEY);
logger.info("Mqtt clientId:{} username:{} offline...", clientId, username);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册