diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..270a74c231defb73755dc17fd606097432c6d0b1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +public class MqttConfig { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + private int snodeHandleMqttThreadPoolQueueCapacity = 10000; + + private int snodeHandleMqttMessageMinPoolSize = 10; + + private int snodeHandleMqttMessageMaxPoolSize = 20; + + private long houseKeepingInterval = 10 * 1000; + + private int snodePushMqttMessageMinPoolSize = 10; + + private int snodePushMqttMessageMaxPoolSize = 20; + + private int snodePushMqttMessageThreadPoolQueueCapacity = 10000; + + private int listenPort = 1883; + /** + * Acl feature switch + */ + @ImportantField + private boolean aclEnable = false; + + public long getHouseKeepingInterval() { + return houseKeepingInterval; + } + + public void setHouseKeepingInterval(long houseKeepingInterval) { + this.houseKeepingInterval = houseKeepingInterval; + } + + /** + * This configurable item defines interval of topics registration of broker to name server. Allowing values are + * between 10, 000 and 60, 000 milliseconds. + */ + private int registerNameServerPeriod = 1000 * 30; + + public int getRegisterNameServerPeriod() { + return registerNameServerPeriod; + } + + public void setRegisterNameServerPeriod(int registerNameServerPeriod) { + this.registerNameServerPeriod = registerNameServerPeriod; + } + + @ImportantField + private boolean fetchNamesrvAddrByAddressServer = false; + + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + log.error("Failed to obtain the host name", e); + } + + return "DEFAULT_SNODE"; + } + + public boolean isFetchNamesrvAddrByAddressServer() { + return fetchNamesrvAddrByAddressServer; + } + + public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) { + this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer; + } + + public int getListenPort() { + return listenPort; + } + + public String getRocketmqHome() { + return rocketmqHome; + } + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + + public int getSnodeHandleMqttThreadPoolQueueCapacity() { + return snodeHandleMqttThreadPoolQueueCapacity; + } + + public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) { + this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; + } + + public int getSnodeHandleMqttMessageMinPoolSize() { + return snodeHandleMqttMessageMinPoolSize; + } + + public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) { + this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize; + } + + public int getSnodeHandleMqttMessageMaxPoolSize() { + return snodeHandleMqttMessageMaxPoolSize; + } + + public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) { + this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize; + } + + public int getSnodePushMqttMessageMinPoolSize() { + return snodePushMqttMessageMinPoolSize; + } + + public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) { + this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize; + } + + public int getSnodePushMqttMessageMaxPoolSize() { + return snodePushMqttMessageMaxPoolSize; + } + + public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) { + this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize; + } + + public int getSnodePushMqttMessageThreadPoolQueueCapacity() { + return snodePushMqttMessageThreadPoolQueueCapacity; + } + + public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) { + this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity; + } + + public boolean isAclEnable() { + return aclEnable; + } + + public void setAclEnable(boolean aclEnable) { + this.aclEnable = aclEnable; + } + +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index f06fa0f40ee5c031e848b713fb09b54353b5b8b2..8716cdd9b0ce451945b68f7db68dfb74b80ccf35 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -325,7 +325,7 @@ public class SnodeController { } - public void registerProcessor() { + private void registerProcessor() { if (snodeServer != null) { this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); @@ -333,6 +333,7 @@ public class SnodeController { this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);