diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 6ce542a5dd629a684bc642062a9e9dccd5a5419a..91b6c8181ec524ea15b07c0adf9a079e09be1558 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory; public class ConsumerGroupInfo { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final String groupName; - private final ConcurrentHashMap subscriptionTable = + private final ConcurrentMap subscriptionTable = new ConcurrentHashMap(); - private final ConcurrentHashMap channelInfoTable = + private final ConcurrentMap channelInfoTable = new ConcurrentHashMap(16); private volatile ConsumeType consumeType; private volatile MessageModel messageModel; @@ -63,11 +64,11 @@ public class ConsumerGroupInfo { return null; } - public ConcurrentHashMap getSubscriptionTable() { + public ConcurrentMap getSubscriptionTable() { return subscriptionTable; } - public ConcurrentHashMap getChannelInfoTable() { + public ConcurrentMap getChannelInfoTable() { return channelInfoTable; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index a5ddec8cc2ac88fcbd7f5cc44f5b8af128253358..4a262e52286eb401521c8e2e83c55d8df736d290 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory; public class ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final ConcurrentHashMap consumerTable = + private final ConcurrentMap consumerTable = new ConcurrentHashMap(1024); private final ConsumerIdsChangeListener consumerIdsChangeListener; @@ -145,7 +146,7 @@ public class ConsumerManager { Entry next = it.next(); String group = next.getKey(); ConsumerGroupInfo consumerGroupInfo = next.getValue(); - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = consumerGroupInfo.getChannelInfoTable(); Iterator> itChannel = channelInfoTable.entrySet().iterator(); @@ -176,7 +177,7 @@ public class ConsumerManager { Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); - ConcurrentHashMap subscriptionTable = + ConcurrentMap subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 863da627e7db64f8bc9321574a1af5f9e4737dab..65b444e6e4baf62a3ee22ddf66644d703862b842 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -189,7 +189,7 @@ public class Broker2Client { this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); @@ -252,7 +252,7 @@ public class Broker2Client { Map> consumerStatusTable = new HashMap>(); - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); if (null == channelInfoTable || channelInfoTable.isEmpty()) { result.setCode(ResponseCode.SYSTEM_ERROR); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 98aceb632108410fbbb868ad0a1a255865719436..ed5a87582946b355ebbeda9757b5f8410ff112fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,7 +32,7 @@ public class RebalanceLockManager { private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); private final Lock lock = new ReentrantLock(); - private final ConcurrentHashMap> mqLockTable = + private final ConcurrentMap> mqLockTable = new ConcurrentHashMap>(1024); public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java index 7f790af624f14c44946a9a7cf4f752e663d4ae7d..f50db86308c8cdeed6a8dceacaf199c6535545c4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.filter; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager { private static final long MS_24_HOUR = 24 * 3600 * 1000; - private ConcurrentHashMap + private ConcurrentMap filterDataByTopic = new ConcurrentHashMap(256); private transient BrokerController brokerController; @@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager { } } - public ConcurrentHashMap getFilterDataByTopic() { + public ConcurrentMap getFilterDataByTopic() { return filterDataByTopic; } @@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager { public static class FilterDataMapByTopic { - private ConcurrentHashMap + private ConcurrentMap groupFilterData = new ConcurrentHashMap(); private String topic; @@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager { return this.groupFilterData.get(consumerGroup); } - public final ConcurrentHashMap getGroupFilterData() { + public final ConcurrentMap getGroupFilterData() { return this.groupFilterData; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index b935bc8bebf3010c8969e97baf2c9392445bf7a3..52cb9199931a03fe251c459dd5f3ed92d9bf00b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,7 +39,7 @@ public class FilterServerManager { public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap filterServerTable = + private final ConcurrentMap filterServerTable = new ConcurrentHashMap(16); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 71f56a4bd36b68af5c3805f3cac8faf2f4c1543a..b1bd86f5a783c5552bae20001e13fec6df07beeb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread { private static final String TOPIC_QUEUEID_SEPARATOR = "@"; private final BrokerController brokerController; private final SystemClock systemClock = new SystemClock(); - private ConcurrentHashMap pullRequestTable = + private ConcurrentMap pullRequestTable = new ConcurrentHashMap(1024); public PullRequestHoldService(final BrokerController brokerController) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 769c4ad0fb2dbecfb0965156e64343052a2fe48a..57565a6401edfe0d3ce58bd3e56ad56d6a3136bf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; - private ConcurrentHashMap> offsetTable = - new ConcurrentHashMap>(512); + private ConcurrentMap> offsetTable = + new ConcurrentHashMap>(512); private transient BrokerController brokerController; @@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager { } public void scanUnsubscribedTopic() { - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap table) { + private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap table) { Iterator> it = table.entrySet().iterator(); boolean result = !table.isEmpty(); @@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set whichTopicByConsumer(final String group) { Set topics = new HashSet(); - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set whichGroupByTopic(final String topic) { Set groups = new HashSet(); - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager { } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { - ConcurrentHashMap map = this.offsetTable.get(key); + ConcurrentMap map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap(32); map.put(queueId, offset); @@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager { public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentHashMap map = this.offsetTable.get(key); + ConcurrentMap map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) @@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager { return RemotingSerializable.toJson(this, prettyFormat); } - public ConcurrentHashMap> getOffsetTable() { + public ConcurrentMap> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap> offsetTable) { + public void setOffsetTable(ConcurrentHashMap> offsetTable) { this.offsetTable = offsetTable; } @@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - for (Map.Entry> offSetEntry : this.offsetTable.entrySet()) { + for (Map.Entry> offSetEntry : this.offsetTable.entrySet()) { String topicGroup = offSetEntry.getKey(); String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); if (topic.equals(topicGroupArr[0])) { @@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager { } public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { - ConcurrentHashMap offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); + ConcurrentMap offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); if (offsets != null) { this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap(offsets)); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f59d2952e1b1468ad2d1d588d3d1ab86347d2b56..71fdda931ff29fbec56b9f28ddbfbc84c3d07a52 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetConsumeStatsInBrokerHeader requestHeader = (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); boolean isOrder = requestHeader.isOrder(); - ConcurrentHashMap subscriptionGroups = + ConcurrentMap subscriptionGroups = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); List>> brokerConsumeStatsList = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index bdf2a01efea6b426e0e16370ddd8e6c664d58fe0..bd4a26ed7dd65ff7252d09987f7cc3b3c4b1c76a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory; public class SubscriptionGroupManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap subscriptionGroupTable = + private final ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); private transient BrokerController brokerController; @@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager { } } - public ConcurrentHashMap getSubscriptionGroupTable() { + public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 93a631ac360e7bdecaa9de5c876f29a1111f2d93..3bcafc0ed00cb4a7d4fb7fa61eb9af9be421b08b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); - private final ConcurrentHashMap topicConfigTable = + private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); private final Set systemTopicList = new HashSet(); @@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager { return dataVersion; } - public ConcurrentHashMap getTopicConfigTable() { + public ConcurrentMap getTopicConfigTable() { return topicConfigTable; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 6bae85a6f206cc9109c0c727e14eaf1ca42edf6b..e0b546d247e4911a37a6c7559cd9e9ad9f2a0cf3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; @@ -35,11 +36,11 @@ import org.slf4j.Logger; public class MQPullConsumerScheduleService { private final Logger log = ClientLogger.getLog(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); - private final ConcurrentHashMap taskTable = + private final ConcurrentMap taskTable = new ConcurrentHashMap(); private DefaultMQPullConsumer defaultMQPullConsumer; private int pullThreadNums = 20; - private ConcurrentHashMap callbackTable = + private ConcurrentMap callbackTable = new ConcurrentHashMap(); private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService { } } - public ConcurrentHashMap getCallbackTable() { + public ConcurrentMap getCallbackTable() { return callbackTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 6c815167bbb9059e9997cbade4fb1800329e960c..d4b19b234b3074c8d582f02879077d2f2194a6cb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore { private final MQClientInstance mQClientFactory; private final String groupName; private final String storePath; - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java index 32bcc9f98d55f2e46c2a4945f271ef3812c4c9bb..7dfd97af6af71b14cb441fd9c3502f5df08a8d80 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer.store; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; @@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; * Wrapper class for offset serialization */ public class OffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); - public ConcurrentHashMap getOffsetTable() { + public ConcurrentMap getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap offsetTable) { + public void setOffsetTable(ConcurrentMap offsetTable) { this.offsetTable = offsetTable; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 60ad101b9999e09a55cac603c91029f8634ab5c8..5bd5749ea093d2fde9c7f78dbc3321b8a4e4d642 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private final static Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String groupName; - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index f596b8365e758e1b72d66ab6ba74b050927bb365..25877d7386d205ee652015099424ee66c48c3b4f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -28,7 +29,7 @@ public class MQClientManager { private final static Logger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); - private ConcurrentHashMap factoryTable = + private ConcurrentMap factoryTable = new ConcurrentHashMap(); private MQClientManager() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 7d43b372e414c8f06eb171911e4a6204f9a2f5ff..35ee16fe3b3a36f8cb02465ccf0a7cb85574d360 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { throw new IllegalArgumentException("topic is null"); } - ConcurrentHashMap mqTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap mqTable = this.rebalanceImpl.getProcessQueueTable(); Set mqResult = new HashSet(); for (MessageQueue mq : mqTable.keySet()) { if (mq.getTopic().equals(topic)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 8767964244bb4f378dcd16ab55bd28bb1d4aa542..9bf34be8cdf3e9db713982f2edf40fd7c9b8e3b2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public ConcurrentHashMap getSubscriptionInner() { + public ConcurrentMap getSubscriptionInner() { return this.rebalanceImpl.getSubscriptionInner(); } @@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long computeAccumulationTotal() { long msgAccTotal = 0; - ConcurrentHashMap processQueueTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap processQueueTable = this.rebalanceImpl.getProcessQueueTable(); Iterator> it = processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java index c25e41bbf73d2a76e7aa26f1cef5911462817fb1..a02f1b6ef6e7bc9571ba728cf1509056ad78d5de 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java @@ -17,13 +17,14 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.message.MessageQueue; /** * Message lock,strictly ensure the single queue only one thread at a time consuming */ public class MessageQueueLock { - private ConcurrentHashMap mqLockTable = + private ConcurrentMap mqLockTable = new ConcurrentHashMap(); public Object fetchLockObject(final MessageQueue mq) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 304a44a6d9a4a538fb437372f441fe33ae24f387..bbdf27d7a55996ff6bf035d45fd717c90307b713 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -53,7 +54,7 @@ public class PullAPIWrapper { private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; - private ConcurrentHashMap pullFromWhichNodeTable = + private ConcurrentMap pullFromWhichNodeTable = new ConcurrentHashMap(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; @@ -247,7 +248,7 @@ public class PullAPIWrapper { private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) throws MQClientException { - ConcurrentHashMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); + ConcurrentMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) { TopicRouteData topicRouteData = topicRouteTable.get(topic); List list = topicRouteData.getFilterServerTable().get(brokerAddr); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 6b12221fa2bab167e35bdb11678340ca578a6253..634e0f0e51b61b5b23e98fcfdfb8286ae857d334 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -44,10 +45,10 @@ import org.slf4j.Logger; */ public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); - protected final ConcurrentHashMap processQueueTable = new ConcurrentHashMap(64); - protected final ConcurrentHashMap> topicSubscribeInfoTable = + protected final ConcurrentMap processQueueTable = new ConcurrentHashMap(64); + protected final ConcurrentMap> topicSubscribeInfoTable = new ConcurrentHashMap>(); - protected final ConcurrentHashMap subscriptionInner = + protected final ConcurrentMap subscriptionInner = new ConcurrentHashMap(); protected String consumerGroup; protected MessageModel messageModel; @@ -232,7 +233,7 @@ public abstract class RebalanceImpl { this.truncateMessageQueueNotMyTopic(); } - public ConcurrentHashMap getSubscriptionInner() { + public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } @@ -421,11 +422,11 @@ public abstract class RebalanceImpl { } } - public ConcurrentHashMap getProcessQueueTable() { + public ConcurrentMap getProcessQueueTable() { return processQueueTable; } - public ConcurrentHashMap> getTopicSubscribeInfoTable() { + public ConcurrentMap> getTopicSubscribeInfoTable() { return topicSubscribeInfoTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 1b075ee114cdacda49de1dbe75c71ac515f838ea..f146be9bb5cfad6380b8e6544496a669ea2fa4d2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -88,18 +89,18 @@ public class MQClientInstance { private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); - private final ConcurrentHashMap producerTable = new ConcurrentHashMap(); - private final ConcurrentHashMap consumerTable = new ConcurrentHashMap(); - private final ConcurrentHashMap adminExtTable = new ConcurrentHashMap(); + private final ConcurrentMap producerTable = new ConcurrentHashMap(); + private final ConcurrentMap consumerTable = new ConcurrentHashMap(); + private final ConcurrentMap adminExtTable = new ConcurrentHashMap(); private final NettyClientConfig nettyClientConfig; private final MQClientAPIImpl mQClientAPIImpl; private final MQAdminImpl mQAdminImpl; - private final ConcurrentHashMap topicRouteTable = new ConcurrentHashMap(); + private final ConcurrentMap topicRouteTable = new ConcurrentHashMap(); private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); - private final ConcurrentHashMap> brokerAddrTable = + private final ConcurrentMap> brokerAddrTable = new ConcurrentHashMap>(); - private final ConcurrentHashMap> brokerVersionTable = + private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -1088,7 +1089,7 @@ public class MQClientInstance { } consumer.suspend(); - ConcurrentHashMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); + ConcurrentMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { @@ -1166,7 +1167,7 @@ public class MQClientInstance { return defaultMQProducer; } - public ConcurrentHashMap getTopicRouteTable() { + public ConcurrentMap getTopicRouteTable() { return topicRouteTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d828875d3f5ad6d1a3fbfbc335a4e7087c8a2929..12f8a3679e50457f353f21c4bdda3674ad9729f3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -26,6 +26,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final Logger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; - private final ConcurrentHashMap topicPublishInfoTable = + private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap(); private final ArrayList sendMessageHookList = new ArrayList(); private final RPCHook rpcHook; @@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } - public ConcurrentHashMap getTopicPublishInfoTable() { + public ConcurrentMap getTopicPublishInfoTable() { return topicPublishInfoTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java index 7478dd26e83a3da0c20c34cb69bb6dc0f0f9f5f6..3a0356c7cc94f471de17a3ced3b4b7f5946abcd0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerConnection extends RemotingSerializable { private HashSet connectionSet = new HashSet(); - private ConcurrentHashMap subscriptionTable = + private ConcurrentMap subscriptionTable = new ConcurrentHashMap(); private ConsumeType consumeType; private MessageModel messageModel; @@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable { this.connectionSet = connectionSet; } - public ConcurrentHashMap getSubscriptionTable() { + public ConcurrentMap getSubscriptionTable() { return subscriptionTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java index 02bf8117a68d6ef93860d2b4fabc825bef96ea63..5b08d788d9e6718cf8df4ea78263a259337ce20f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -18,17 +18,18 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap> offsetTable = - new ConcurrentHashMap>(512); + private ConcurrentMap> offsetTable = + new ConcurrentHashMap>(512); - public ConcurrentHashMap> getOffsetTable() { + public ConcurrentMap> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap> offsetTable) { + public void setOffsetTable(ConcurrentMap> offsetTable) { this.offsetTable = offsetTable; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java index 92c15eb978f44e5ce3b323f0d1d29cfb398c0c8d..e05f75961ff349727d53589e3997e9351607f014 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java @@ -18,21 +18,22 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class SubscriptionGroupWrapper extends RemotingSerializable { - private ConcurrentHashMap subscriptionGroupTable = + private ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap(1024); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap getSubscriptionGroupTable() { + public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; } public void setSubscriptionGroupTable( - ConcurrentHashMap subscriptionGroupTable) { + ConcurrentMap subscriptionGroupTable) { this.subscriptionGroupTable = subscriptionGroupTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index c471d1a995701d98378ec3868422b7a0ecebe3a5..ce123021d459b69b8f26ccf43f2b22ebe6216082 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -18,20 +18,21 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicConfigSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap topicConfigTable = + private ConcurrentMap topicConfigTable = new ConcurrentHashMap(); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap getTopicConfigTable() { + public ConcurrentMap getTopicConfigTable() { return topicConfigTable; } - public void setTopicConfigTable(ConcurrentHashMap topicConfigTable) { + public void setTopicConfigTable(ConcurrentMap topicConfigTable) { this.topicConfigTable = topicConfigTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 5498d34cde24bbbb101540e1bd95bce7fa9fc8bb..57dfc386f4fd3a02a3a7fd048f26e4b49125ea4d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class MomentStatsItemSet { - private final ConcurrentHashMap statsItemTable = + private final ConcurrentMap statsItemTable = new ConcurrentHashMap(128); private final String statsName; private final ScheduledExecutorService scheduledExecutorService; @@ -39,7 +40,7 @@ public class MomentStatsItemSet { this.init(); } - public ConcurrentHashMap getStatsItemTable() { + public ConcurrentMap getStatsItemTable() { return statsItemTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 8633d682865c682e4ea0d205c56e1d39c872ca09..17dbf0d2a834ca67465b00fcdb259f851ca6f57c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class StatsItemSet { - private final ConcurrentHashMap statsItemTable = + private final ConcurrentMap statsItemTable = new ConcurrentHashMap(128); private final String statsName; diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java index 2c31538ff152391e9c15e95ae593dfbbb3477146..490c5821ab42983c5f39ee64a46dfebe84a80f7e 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +41,7 @@ public class FilterClassManager { private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); - private ConcurrentHashMap filterClassTable = + private ConcurrentMap filterClassTable = new ConcurrentHashMap(128); private FilterClassFetchMethod filterClassFetchMethod; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 5a953a9e20e8bcbba1fee7ddf6761f99548786fc..7479fcc5f793abefa805956a46e30661dbc9aa2c 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.common.DataVersion; @@ -135,7 +135,7 @@ public class RouteInfoManager { && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// || registerFirst) { - ConcurrentHashMap tcTable = + ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry entry : tcTable.entrySet()) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 15586cbf9b6234085873f1b106c861ee5719872b..0ba714a79801aea25c01a838579ed26c76a4e843 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract { /** * This map caches all on-going requests. */ - protected final ConcurrentHashMap responseTable = + protected final ConcurrentMap responseTable = new ConcurrentHashMap(256); /** diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 52ca47e6c2e5fccc1d7ffb9cc7a39ad12a23f60c..1c3da9adca2d9cdf250267603afc9d52bbd9bcb7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); - private final ConcurrentHashMap channelTables = new ConcurrentHashMap(); + private final ConcurrentMap channelTables = new ConcurrentHashMap(); private final Timer timer = new Timer("ClientHouseKeepingService", true); diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 0993a5fae13a760365ceba8b57f9532eab1f3b9a..abb8385642e23c93119fe2fe45b54ea5ef21debd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; public class AllocateMappedFileService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int waitTimeOut = 1000 * 5; - private ConcurrentHashMap requestTable = + private ConcurrentMap requestTable = new ConcurrentHashMap(); private PriorityBlockingQueue requestQueue = new PriorityBlockingQueue(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 931edc7656767e993e01ee03c3bb3b97bef2dba5..4549f1ea32bcad84bd39b618e933f06e695e3182 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore { // CommitLog private final CommitLog commitLog; - private final ConcurrentHashMap> consumeQueueTable; + private final ConcurrentMap> consumeQueueTable; private final FlushConsumeQueueService flushConsumeQueueService; @@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore { } public void truncateDirtyLogicFiles(long phyOffset) { - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { logic.truncateDirtyLogicFiles(phyOffset); } @@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore { } public void destroyLogics() { - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.destroy(); } @@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore { @Override public int cleanUnusedTopic(Set topics) { - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap queueTable = next.getValue(); + ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // @@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore { public void cleanExpiredConsumerQueue() { long minCommitLogOffset = this.commitLog.getMinOffset(); - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topic = next.getKey(); if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap queueTable = next.getValue(); + ConcurrentMap queueTable = next.getValue(); Iterator> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry nextQT = itQT.next(); @@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore { } public ConsumeQueue findConsumeQueue(String topic, int queueId) { - ConcurrentHashMap map = consumeQueueTable.get(topic); + ConcurrentMap map = consumeQueueTable.get(topic); if (null == map) { - ConcurrentHashMap newMap = new ConcurrentHashMap(128); - ConcurrentHashMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); + ConcurrentMap newMap = new ConcurrentHashMap(128); + ConcurrentMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { @@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore { private void checkSelf() { this.commitLog.checkSelf(); - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); Iterator> itNext = next.getValue().entrySet().iterator(); while (itNext.hasNext()) { Entry cq = itNext.next(); @@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore { } private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { - ConcurrentHashMap map = this.consumeQueueTable.get(topic); + ConcurrentMap map = this.consumeQueueTable.get(topic); if (null == map) { map = new ConcurrentHashMap(); map.put(queueId, consumeQueue); @@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore { } private void recoverConsumeQueue() { - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); } @@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore { private void recoverTopicQueueTable() { HashMap table = new HashMap(1024); long minPhyOffset = this.commitLog.getMinOffset(); - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); @@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore { return runningFlags; } - public ConcurrentHashMap> getConsumeQueueTable() { + public ConcurrentMap> getConsumeQueueTable() { return consumeQueueTable; } @@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore { @Override public ConsumeQueue getConsumeQueue(String topic, int queueId) { - ConcurrentHashMap map = consumeQueueTable.get(topic); + ConcurrentMap map = consumeQueueTable.get(topic); if (map == null) { return null; } @@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore { if (minOffset > this.lastPhysicalMinOffset) { this.lastPhysicalMinOffset = minOffset; - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { int deleteCount = logic.deleteExpiredFile(minOffset); @@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore { logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java index efb6aa814875999a7190e744face8fec61f46ce0..7021992c598cfe0635851513059452a375ce2918 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java @@ -17,17 +17,18 @@ package org.apache.rocketmq.store.schedule; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class DelayOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(32); - public ConcurrentHashMap getOffsetTable() { + public ConcurrentMap getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap offsetTable) { + public void setOffsetTable(ConcurrentMap offsetTable) { this.offsetTable = offsetTable; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 501876ed296b5128b25693ae4c7bfedd6deca116..172954deea140365fa968cebe264558e4261afd1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager { private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; - private final ConcurrentHashMap delayLevelTable = + private final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32); - private final ConcurrentHashMap offsetTable = + private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); private final Timer timer = new Timer("ScheduleMessageTimerThread", true);