提交 c74727bb 编写于 作者: 浅梦2013's avatar 浅梦2013

开始 2.2.5-SNAPSHOT

上级 a69b6938
...@@ -29,7 +29,7 @@ import org.tio.utils.timer.TimerTaskService; ...@@ -29,7 +29,7 @@ import org.tio.utils.timer.TimerTaskService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -44,7 +44,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -44,7 +44,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private final IMqttClientConnectListener connectListener; private final IMqttClientConnectListener connectListener;
private final IMqttClientMessageIdGenerator messageIdGenerator; private final IMqttClientMessageIdGenerator messageIdGenerator;
private final TimerTaskService taskService; private final TimerTaskService taskService;
private final ThreadPoolExecutor executor; private final ExecutorService executor;
public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) { public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize(); this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
......
...@@ -28,7 +28,7 @@ import org.tio.core.Tio; ...@@ -28,7 +28,7 @@ import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
/** /**
* mqtt 客户端监听器 * mqtt 客户端监听器
...@@ -39,7 +39,7 @@ public class MqttClientAioListener extends DefaultTioClientListener { ...@@ -39,7 +39,7 @@ public class MqttClientAioListener extends DefaultTioClientListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientAioListener.class); private static final Logger logger = LoggerFactory.getLogger(MqttClientAioListener.class);
private final MqttClientCreator clientCreator; private final MqttClientCreator clientCreator;
private final IMqttClientConnectListener connectListener; private final IMqttClientConnectListener connectListener;
private final ThreadPoolExecutor executor; private final ExecutorService executor;
public MqttClientAioListener(MqttClientCreator clientCreator) { public MqttClientAioListener(MqttClientCreator clientCreator) {
this.clientCreator = clientCreator; this.clientCreator = clientCreator;
......
...@@ -27,6 +27,7 @@ import org.tio.client.intf.TioClientHandler; ...@@ -27,6 +27,7 @@ import org.tio.client.intf.TioClientHandler;
import org.tio.client.intf.TioClientListener; import org.tio.client.intf.TioClientListener;
import org.tio.core.TioConfig; import org.tio.core.TioConfig;
import org.tio.core.ssl.SslConfig; import org.tio.core.ssl.SslConfig;
import org.tio.utils.Threads;
import org.tio.utils.buffer.ByteBufferAllocator; import org.tio.utils.buffer.ByteBufferAllocator;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.SynThreadPoolExecutor; import org.tio.utils.thread.pool.SynThreadPoolExecutor;
...@@ -34,6 +35,7 @@ import org.tio.utils.timer.DefaultTimerTaskService; ...@@ -34,6 +35,7 @@ import org.tio.utils.timer.DefaultTimerTaskService;
import org.tio.utils.timer.TimerTaskService; import org.tio.utils.timer.TimerTaskService;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer; import java.util.function.Consumer;
...@@ -170,7 +172,7 @@ public final class MqttClientCreator { ...@@ -170,7 +172,7 @@ public final class MqttClientCreator {
/** /**
* mqttExecutor * mqttExecutor
*/ */
private ThreadPoolExecutor mqttExecutor; private ExecutorService mqttExecutor;
/** /**
* taskService * taskService
*/ */
...@@ -296,7 +298,7 @@ public final class MqttClientCreator { ...@@ -296,7 +298,7 @@ public final class MqttClientCreator {
return groupExecutor; return groupExecutor;
} }
public ThreadPoolExecutor getMqttExecutor() { public ExecutorService getMqttExecutor() {
return mqttExecutor; return mqttExecutor;
} }
...@@ -510,11 +512,11 @@ public final class MqttClientCreator { ...@@ -510,11 +512,11 @@ public final class MqttClientCreator {
} }
// tioExecutor // tioExecutor
if (this.tioExecutor == null) { if (this.tioExecutor == null) {
this.tioExecutor = ThreadUtil.getTioExecutor(3); this.tioExecutor = Threads.getTioExecutor(3);
} }
// groupExecutor // groupExecutor
if (this.groupExecutor == null) { if (this.groupExecutor == null) {
this.groupExecutor = ThreadUtil.getGroupExecutor(2); this.groupExecutor = Threads.getGroupExecutor(2);
} }
// mqttExecutor // mqttExecutor
if (this.mqttExecutor == null) { if (this.mqttExecutor == null) {
......
...@@ -21,6 +21,7 @@ import org.tio.utils.thread.pool.DefaultThreadFactory; ...@@ -21,6 +21,7 @@ import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.utils.thread.pool.SynThreadPoolExecutor; import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.thread.pool.TioCallerRunsPolicy; import org.tio.utils.thread.pool.TioCallerRunsPolicy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -57,13 +58,7 @@ public final class ThreadUtil { ...@@ -57,13 +58,7 @@ public final class ThreadUtil {
* @return ThreadPoolExecutor * @return ThreadPoolExecutor
*/ */
public static ThreadPoolExecutor getGroupExecutor(int groupPoolSize) { public static ThreadPoolExecutor getGroupExecutor(int groupPoolSize) {
String threadName = "tio-group"; return Threads.getGroupExecutor(groupPoolSize);
DefaultThreadFactory threadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(groupPoolSize, groupPoolSize,
Threads.KEEP_ALIVE_TIME, TimeUnit.SECONDS, runnableQueue, threadFactory, new TioCallerRunsPolicy());
groupExecutor.prestartCoreThread();
return groupExecutor;
} }
/** /**
...@@ -73,13 +68,7 @@ public final class ThreadUtil { ...@@ -73,13 +68,7 @@ public final class ThreadUtil {
* @return SynThreadPoolExecutor * @return SynThreadPoolExecutor
*/ */
public static SynThreadPoolExecutor getTioExecutor(int tioPoolSize) { public static SynThreadPoolExecutor getTioExecutor(int tioPoolSize) {
String threadName = "tio-worker"; return Threads.getTioExecutor(tioPoolSize);
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(tioPoolSize, tioPoolSize,
Threads.KEEP_ALIVE_TIME, runnableQueue, defaultThreadFactory, new TioCallerRunsPolicy());
tioExecutor.prestartCoreThread();
return tioExecutor;
} }
/** /**
...@@ -88,7 +77,7 @@ public final class ThreadUtil { ...@@ -88,7 +77,7 @@ public final class ThreadUtil {
* @param poolSize 业务线程池大小 * @param poolSize 业务线程池大小
* @return ThreadPoolExecutor * @return ThreadPoolExecutor
*/ */
public static ThreadPoolExecutor getMqttExecutor(int poolSize) { public static ExecutorService getMqttExecutor(int poolSize) {
String threadName = "mqtt-worker"; String threadName = "mqtt-worker";
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>(); LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY); DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
......
...@@ -31,7 +31,7 @@ import org.tio.server.DefaultTioServerListener; ...@@ -31,7 +31,7 @@ import org.tio.server.DefaultTioServerListener;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
/** /**
* mqtt 服务监听 * mqtt 服务监听
...@@ -45,9 +45,9 @@ public class MqttServerAioListener extends DefaultTioServerListener { ...@@ -45,9 +45,9 @@ public class MqttServerAioListener extends DefaultTioServerListener {
private final IMqttMessageDispatcher messageDispatcher; private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener; private final IMqttConnectStatusListener connectStatusListener;
private final MqttMessageInterceptors messageInterceptors; private final MqttMessageInterceptors messageInterceptors;
private final ThreadPoolExecutor executor; private final ExecutorService executor;
public MqttServerAioListener(MqttServerCreator serverCreator, ThreadPoolExecutor executor) { public MqttServerAioListener(MqttServerCreator serverCreator, ExecutorService executor) {
this.messageStore = serverCreator.getMessageStore(); this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager(); this.sessionManager = serverCreator.getSessionManager();
this.messageDispatcher = serverCreator.getMessageDispatcher(); this.messageDispatcher = serverCreator.getMessageDispatcher();
......
...@@ -57,7 +57,7 @@ import org.tio.utils.timer.TimerTaskService; ...@@ -57,7 +57,7 @@ import org.tio.utils.timer.TimerTaskService;
import java.io.InputStream; import java.io.InputStream;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
...@@ -202,6 +202,10 @@ public class MqttServerCreator { ...@@ -202,6 +202,10 @@ public class MqttServerCreator {
* taskService * taskService
*/ */
private TimerTaskService taskService; private TimerTaskService taskService;
/**
* 业务消费线程
*/
private ExecutorService mqttExecutor;
/** /**
* json 处理器 * json 处理器
*/ */
...@@ -533,6 +537,15 @@ public class MqttServerCreator { ...@@ -533,6 +537,15 @@ public class MqttServerCreator {
return this; return this;
} }
public ExecutorService getMqttExecutor() {
return mqttExecutor;
}
public MqttServerCreator mqttExecutor(ExecutorService mqttExecutor) {
this.mqttExecutor = mqttExecutor;
return this;
}
public JsonAdapter getJsonAdapter() { public JsonAdapter getJsonAdapter() {
return jsonAdapter; return jsonAdapter;
} }
...@@ -567,7 +580,9 @@ public class MqttServerCreator { ...@@ -567,7 +580,9 @@ public class MqttServerCreator {
this.taskService = new DefaultTimerTaskService(200L, 60); this.taskService = new DefaultTimerTaskService(200L, 60);
} }
// 业务线程池 // 业务线程池
ThreadPoolExecutor mqttExecutor = ThreadUtil.getMqttExecutor(Threads.MAX_POOL_SIZE_FOR_TIO); if (this.mqttExecutor == null) {
this.mqttExecutor = ThreadUtil.getMqttExecutor(Threads.MAX_POOL_SIZE_FOR_TIO);
}
// AckService // AckService
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, this.taskService, mqttExecutor); DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, this.taskService, mqttExecutor);
// 1. 处理消息 // 1. 处理消息
......
...@@ -45,7 +45,7 @@ import org.tio.utils.timer.TimerTaskService; ...@@ -45,7 +45,7 @@ import org.tio.utils.timer.TimerTaskService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
/** /**
* mqtt broker 处理器 * mqtt broker 处理器
...@@ -75,11 +75,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { ...@@ -75,11 +75,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final IMqttSessionListener sessionListener; private final IMqttSessionListener sessionListener;
private final IMqttMessageListener messageListener; private final IMqttMessageListener messageListener;
private final TimerTaskService taskService; private final TimerTaskService taskService;
private final ThreadPoolExecutor executor; private final ExecutorService executor;
public DefaultMqttServerProcessor(MqttServerCreator serverCreator, public DefaultMqttServerProcessor(MqttServerCreator serverCreator,
TimerTaskService taskService, TimerTaskService taskService,
ThreadPoolExecutor executor) { ExecutorService executor) {
this.serverCreator = serverCreator; this.serverCreator = serverCreator;
this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : serverCreator.getHeartbeatTimeout(); this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : serverCreator.getHeartbeatTimeout();
this.messageStore = serverCreator.getMessageStore(); this.messageStore = serverCreator.getMessageStore();
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
<properties> <properties>
<!-- mica-mqtt version --> <!-- mica-mqtt version -->
<revision>2.2.4</revision> <revision>2.2.5-SNAPSHOT</revision>
<!-- java version --> <!-- java version -->
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册