From 24d6eefba65845b91513188f66d230af021a3cc9 Mon Sep 17 00:00:00 2001 From: Jaskey Date: Tue, 6 Jun 2017 16:06:46 +0800 Subject: [PATCH] [ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when running client closes apache/incubator-rocketmq#10 --- .../broker/client/ConsumerGroupInfo.java | 9 ++-- .../broker/client/ConsumerManager.java | 7 +-- .../broker/client/net/Broker2Client.java | 6 +-- .../rebalance/RebalanceLockManager.java | 3 +- .../broker/filter/ConsumerFilterManager.java | 9 ++-- .../broker/filtersrv/FilterServerManager.java | 3 +- .../longpolling/PullRequestHoldService.java | 3 +- .../broker/offset/ConsumerOffsetManager.java | 31 ++++++------ .../processor/AdminBrokerProcessor.java | 4 +- .../SubscriptionGroupManager.java | 5 +- .../broker/topic/TopicConfigManager.java | 5 +- .../MQPullConsumerScheduleService.java | 7 +-- .../consumer/store/LocalFileOffsetStore.java | 3 +- .../store/OffsetSerializeWrapper.java | 7 +-- .../store/RemoteBrokerOffsetStore.java | 3 +- .../rocketmq/client/impl/MQClientManager.java | 3 +- .../consumer/DefaultMQPullConsumerImpl.java | 4 +- .../consumer/DefaultMQPushConsumerImpl.java | 6 +-- .../impl/consumer/MessageQueueLock.java | 3 +- .../client/impl/consumer/PullAPIWrapper.java | 5 +- .../client/impl/consumer/RebalanceImpl.java | 13 ++--- .../client/impl/factory/MQClientInstance.java | 17 ++++--- .../impl/producer/DefaultMQProducerImpl.java | 5 +- .../protocol/body/ConsumerConnection.java | 5 +- .../body/ConsumerOffsetSerializeWrapper.java | 9 ++-- .../body/SubscriptionGroupWrapper.java | 7 +-- .../body/TopicConfigSerializeWrapper.java | 7 +-- .../common/stats/MomentStatsItemSet.java | 5 +- .../rocketmq/common/stats/StatsItemSet.java | 3 +- .../filtersrv/filter/FilterClassManager.java | 3 +- .../namesrv/routeinfo/RouteInfoManager.java | 4 +- .../remoting/netty/NettyRemotingAbstract.java | 3 +- .../remoting/netty/NettyRemotingClient.java | 3 +- .../store/AllocateMappedFileService.java | 3 +- .../rocketmq/store/DefaultMessageStore.java | 49 ++++++++++--------- .../schedule/DelayOffsetSerializeWrapper.java | 7 +-- .../schedule/ScheduleMessageService.java | 5 +- 37 files changed, 153 insertions(+), 121 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 6ce542a5..91b6c818 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory; public class ConsumerGroupInfo { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final String groupName; - private final ConcurrentHashMap subscriptionTable = + private final ConcurrentMap subscriptionTable = new ConcurrentHashMap(); - private final ConcurrentHashMap channelInfoTable = + private final ConcurrentMap channelInfoTable = new ConcurrentHashMap(16); private volatile ConsumeType consumeType; private volatile MessageModel messageModel; @@ -63,11 +64,11 @@ public class ConsumerGroupInfo { return null; } - public ConcurrentHashMap getSubscriptionTable() { + public ConcurrentMap getSubscriptionTable() { return subscriptionTable; } - public ConcurrentHashMap getChannelInfoTable() { + public ConcurrentMap getChannelInfoTable() { return channelInfoTable; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index a5ddec8c..4a262e52 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory; public class ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final ConcurrentHashMap consumerTable = + private final ConcurrentMap consumerTable = new ConcurrentHashMap(1024); private final ConsumerIdsChangeListener consumerIdsChangeListener; @@ -145,7 +146,7 @@ public class ConsumerManager { Entry next = it.next(); String group = next.getKey(); ConsumerGroupInfo consumerGroupInfo = next.getValue(); - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = consumerGroupInfo.getChannelInfoTable(); Iterator> itChannel = channelInfoTable.entrySet().iterator(); @@ -176,7 +177,7 @@ public class ConsumerManager { Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); - ConcurrentHashMap subscriptionTable = + ConcurrentMap subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 863da627..65b444e6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -189,7 +189,7 @@ public class Broker2Client { this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); @@ -252,7 +252,7 @@ public class Broker2Client { Map> consumerStatusTable = new HashMap>(); - ConcurrentHashMap channelInfoTable = + ConcurrentMap channelInfoTable = this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); if (null == channelInfoTable || channelInfoTable.isEmpty()) { result.setCode(ResponseCode.SYSTEM_ERROR); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 98aceb63..ed5a8758 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,7 +32,7 @@ public class RebalanceLockManager { private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); private final Lock lock = new ReentrantLock(); - private final ConcurrentHashMap> mqLockTable = + private final ConcurrentMap> mqLockTable = new ConcurrentHashMap>(1024); public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java index 7f790af6..f50db863 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.filter; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager { private static final long MS_24_HOUR = 24 * 3600 * 1000; - private ConcurrentHashMap + private ConcurrentMap filterDataByTopic = new ConcurrentHashMap(256); private transient BrokerController brokerController; @@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager { } } - public ConcurrentHashMap getFilterDataByTopic() { + public ConcurrentMap getFilterDataByTopic() { return filterDataByTopic; } @@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager { public static class FilterDataMapByTopic { - private ConcurrentHashMap + private ConcurrentMap groupFilterData = new ConcurrentHashMap(); private String topic; @@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager { return this.groupFilterData.get(consumerGroup); } - public final ConcurrentHashMap getGroupFilterData() { + public final ConcurrentMap getGroupFilterData() { return this.groupFilterData; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index b935bc8b..52cb9199 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,7 +39,7 @@ public class FilterServerManager { public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap filterServerTable = + private final ConcurrentMap filterServerTable = new ConcurrentHashMap(16); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 71f56a4b..b1bd86f5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread { private static final String TOPIC_QUEUEID_SEPARATOR = "@"; private final BrokerController brokerController; private final SystemClock systemClock = new SystemClock(); - private ConcurrentHashMap pullRequestTable = + private ConcurrentMap pullRequestTable = new ConcurrentHashMap(1024); public PullRequestHoldService(final BrokerController brokerController) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 769c4ad0..57565a64 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; - private ConcurrentHashMap> offsetTable = - new ConcurrentHashMap>(512); + private ConcurrentMap> offsetTable = + new ConcurrentHashMap>(512); private transient BrokerController brokerController; @@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager { } public void scanUnsubscribedTopic() { - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap table) { + private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap table) { Iterator> it = table.entrySet().iterator(); boolean result = !table.isEmpty(); @@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set whichTopicByConsumer(final String group) { Set topics = new HashSet(); - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set whichGroupByTopic(final String topic) { Set groups = new HashSet(); - Iterator>> it = this.offsetTable.entrySet().iterator(); + Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager { } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { - ConcurrentHashMap map = this.offsetTable.get(key); + ConcurrentMap map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap(32); map.put(queueId, offset); @@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager { public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentHashMap map = this.offsetTable.get(key); + ConcurrentMap map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) @@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager { return RemotingSerializable.toJson(this, prettyFormat); } - public ConcurrentHashMap> getOffsetTable() { + public ConcurrentMap> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap> offsetTable) { + public void setOffsetTable(ConcurrentHashMap> offsetTable) { this.offsetTable = offsetTable; } @@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - for (Map.Entry> offSetEntry : this.offsetTable.entrySet()) { + for (Map.Entry> offSetEntry : this.offsetTable.entrySet()) { String topicGroup = offSetEntry.getKey(); String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); if (topic.equals(topicGroupArr[0])) { @@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager { } public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { - ConcurrentHashMap offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); + ConcurrentMap offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); if (offsets != null) { this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap(offsets)); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f59d2952..71fdda93 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetConsumeStatsInBrokerHeader requestHeader = (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); boolean isOrder = requestHeader.isOrder(); - ConcurrentHashMap subscriptionGroups = + ConcurrentMap subscriptionGroups = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); List>> brokerConsumeStatsList = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index bdf2a01e..bd4a26ed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory; public class SubscriptionGroupManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap subscriptionGroupTable = + private final ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); private transient BrokerController brokerController; @@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager { } } - public ConcurrentHashMap getSubscriptionGroupTable() { + public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 93a631ac..3bcafc0e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); - private final ConcurrentHashMap topicConfigTable = + private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); private final Set systemTopicList = new HashSet(); @@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager { return dataVersion; } - public ConcurrentHashMap getTopicConfigTable() { + public ConcurrentMap getTopicConfigTable() { return topicConfigTable; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 6bae85a6..e0b546d2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; @@ -35,11 +36,11 @@ import org.slf4j.Logger; public class MQPullConsumerScheduleService { private final Logger log = ClientLogger.getLog(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); - private final ConcurrentHashMap taskTable = + private final ConcurrentMap taskTable = new ConcurrentHashMap(); private DefaultMQPullConsumer defaultMQPullConsumer; private int pullThreadNums = 20; - private ConcurrentHashMap callbackTable = + private ConcurrentMap callbackTable = new ConcurrentHashMap(); private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService { } } - public ConcurrentHashMap getCallbackTable() { + public ConcurrentMap getCallbackTable() { return callbackTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 6c815167..d4b19b23 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore { private final MQClientInstance mQClientFactory; private final String groupName; private final String storePath; - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java index 32bcc9f9..7dfd97af 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer.store; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; @@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; * Wrapper class for offset serialization */ public class OffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); - public ConcurrentHashMap getOffsetTable() { + public ConcurrentMap getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap offsetTable) { + public void setOffsetTable(ConcurrentMap offsetTable) { this.offsetTable = offsetTable; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 60ad101b..5bd5749e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private final static Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String groupName; - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(); public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index f596b836..25877d73 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -28,7 +29,7 @@ public class MQClientManager { private final static Logger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); - private ConcurrentHashMap factoryTable = + private ConcurrentMap factoryTable = new ConcurrentHashMap(); private MQClientManager() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 7d43b372..35ee16fe 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { throw new IllegalArgumentException("topic is null"); } - ConcurrentHashMap mqTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap mqTable = this.rebalanceImpl.getProcessQueueTable(); Set mqResult = new HashSet(); for (MessageQueue mq : mqTable.keySet()) { if (mq.getTopic().equals(topic)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 87679642..9bf34be8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public ConcurrentHashMap getSubscriptionInner() { + public ConcurrentMap getSubscriptionInner() { return this.rebalanceImpl.getSubscriptionInner(); } @@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long computeAccumulationTotal() { long msgAccTotal = 0; - ConcurrentHashMap processQueueTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap processQueueTable = this.rebalanceImpl.getProcessQueueTable(); Iterator> it = processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java index c25e41bb..a02f1b6e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java @@ -17,13 +17,14 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.message.MessageQueue; /** * Message lock,strictly ensure the single queue only one thread at a time consuming */ public class MessageQueueLock { - private ConcurrentHashMap mqLockTable = + private ConcurrentMap mqLockTable = new ConcurrentHashMap(); public Object fetchLockObject(final MessageQueue mq) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 304a44a6..bbdf27d7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -53,7 +54,7 @@ public class PullAPIWrapper { private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; - private ConcurrentHashMap pullFromWhichNodeTable = + private ConcurrentMap pullFromWhichNodeTable = new ConcurrentHashMap(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; @@ -247,7 +248,7 @@ public class PullAPIWrapper { private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) throws MQClientException { - ConcurrentHashMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); + ConcurrentMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) { TopicRouteData topicRouteData = topicRouteTable.get(topic); List list = topicRouteData.getFilterServerTable().get(brokerAddr); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 6b12221f..634e0f0e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -44,10 +45,10 @@ import org.slf4j.Logger; */ public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); - protected final ConcurrentHashMap processQueueTable = new ConcurrentHashMap(64); - protected final ConcurrentHashMap> topicSubscribeInfoTable = + protected final ConcurrentMap processQueueTable = new ConcurrentHashMap(64); + protected final ConcurrentMap> topicSubscribeInfoTable = new ConcurrentHashMap>(); - protected final ConcurrentHashMap subscriptionInner = + protected final ConcurrentMap subscriptionInner = new ConcurrentHashMap(); protected String consumerGroup; protected MessageModel messageModel; @@ -232,7 +233,7 @@ public abstract class RebalanceImpl { this.truncateMessageQueueNotMyTopic(); } - public ConcurrentHashMap getSubscriptionInner() { + public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } @@ -421,11 +422,11 @@ public abstract class RebalanceImpl { } } - public ConcurrentHashMap getProcessQueueTable() { + public ConcurrentMap getProcessQueueTable() { return processQueueTable; } - public ConcurrentHashMap> getTopicSubscribeInfoTable() { + public ConcurrentMap> getTopicSubscribeInfoTable() { return topicSubscribeInfoTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 1b075ee1..f146be9b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -88,18 +89,18 @@ public class MQClientInstance { private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); - private final ConcurrentHashMap producerTable = new ConcurrentHashMap(); - private final ConcurrentHashMap consumerTable = new ConcurrentHashMap(); - private final ConcurrentHashMap adminExtTable = new ConcurrentHashMap(); + private final ConcurrentMap producerTable = new ConcurrentHashMap(); + private final ConcurrentMap consumerTable = new ConcurrentHashMap(); + private final ConcurrentMap adminExtTable = new ConcurrentHashMap(); private final NettyClientConfig nettyClientConfig; private final MQClientAPIImpl mQClientAPIImpl; private final MQAdminImpl mQAdminImpl; - private final ConcurrentHashMap topicRouteTable = new ConcurrentHashMap(); + private final ConcurrentMap topicRouteTable = new ConcurrentHashMap(); private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); - private final ConcurrentHashMap> brokerAddrTable = + private final ConcurrentMap> brokerAddrTable = new ConcurrentHashMap>(); - private final ConcurrentHashMap> brokerVersionTable = + private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -1088,7 +1089,7 @@ public class MQClientInstance { } consumer.suspend(); - ConcurrentHashMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); + ConcurrentMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { @@ -1166,7 +1167,7 @@ public class MQClientInstance { return defaultMQProducer; } - public ConcurrentHashMap getTopicRouteTable() { + public ConcurrentMap getTopicRouteTable() { return topicRouteTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d828875d..12f8a367 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -26,6 +26,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final Logger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; - private final ConcurrentHashMap topicPublishInfoTable = + private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap(); private final ArrayList sendMessageHookList = new ArrayList(); private final RPCHook rpcHook; @@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } - public ConcurrentHashMap getTopicPublishInfoTable() { + public ConcurrentMap getTopicPublishInfoTable() { return topicPublishInfoTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java index 7478dd26..3a0356c7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerConnection extends RemotingSerializable { private HashSet connectionSet = new HashSet(); - private ConcurrentHashMap subscriptionTable = + private ConcurrentMap subscriptionTable = new ConcurrentHashMap(); private ConsumeType consumeType; private MessageModel messageModel; @@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable { this.connectionSet = connectionSet; } - public ConcurrentHashMap getSubscriptionTable() { + public ConcurrentMap getSubscriptionTable() { return subscriptionTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java index 02bf8117..5b08d788 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -18,17 +18,18 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap> offsetTable = - new ConcurrentHashMap>(512); + private ConcurrentMap> offsetTable = + new ConcurrentHashMap>(512); - public ConcurrentHashMap> getOffsetTable() { + public ConcurrentMap> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap> offsetTable) { + public void setOffsetTable(ConcurrentMap> offsetTable) { this.offsetTable = offsetTable; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java index 92c15eb9..e05f7596 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java @@ -18,21 +18,22 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class SubscriptionGroupWrapper extends RemotingSerializable { - private ConcurrentHashMap subscriptionGroupTable = + private ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap(1024); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap getSubscriptionGroupTable() { + public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; } public void setSubscriptionGroupTable( - ConcurrentHashMap subscriptionGroupTable) { + ConcurrentMap subscriptionGroupTable) { this.subscriptionGroupTable = subscriptionGroupTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index c471d1a9..ce123021 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -18,20 +18,21 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicConfigSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap topicConfigTable = + private ConcurrentMap topicConfigTable = new ConcurrentHashMap(); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap getTopicConfigTable() { + public ConcurrentMap getTopicConfigTable() { return topicConfigTable; } - public void setTopicConfigTable(ConcurrentHashMap topicConfigTable) { + public void setTopicConfigTable(ConcurrentMap topicConfigTable) { this.topicConfigTable = topicConfigTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 5498d34c..57dfc386 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class MomentStatsItemSet { - private final ConcurrentHashMap statsItemTable = + private final ConcurrentMap statsItemTable = new ConcurrentHashMap(128); private final String statsName; private final ScheduledExecutorService scheduledExecutorService; @@ -39,7 +40,7 @@ public class MomentStatsItemSet { this.init(); } - public ConcurrentHashMap getStatsItemTable() { + public ConcurrentMap getStatsItemTable() { return statsItemTable; } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 8633d682..17dbf0d2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class StatsItemSet { - private final ConcurrentHashMap statsItemTable = + private final ConcurrentMap statsItemTable = new ConcurrentHashMap(128); private final String statsName; diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java index 2c31538f..490c5821 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +41,7 @@ public class FilterClassManager { private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); - private ConcurrentHashMap filterClassTable = + private ConcurrentMap filterClassTable = new ConcurrentHashMap(128); private FilterClassFetchMethod filterClassFetchMethod; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 5a953a9e..7479fcc5 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.common.DataVersion; @@ -135,7 +135,7 @@ public class RouteInfoManager { && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// || registerFirst) { - ConcurrentHashMap tcTable = + ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry entry : tcTable.entrySet()) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 15586cbf..0ba714a7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract { /** * This map caches all on-going requests. */ - protected final ConcurrentHashMap responseTable = + protected final ConcurrentMap responseTable = new ConcurrentHashMap(256); /** diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 52ca47e6..1c3da9ad 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); - private final ConcurrentHashMap channelTables = new ConcurrentHashMap(); + private final ConcurrentMap channelTables = new ConcurrentHashMap(); private final Timer timer = new Timer("ClientHouseKeepingService", true); diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 0993a5fa..abb83856 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; public class AllocateMappedFileService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int waitTimeOut = 1000 * 5; - private ConcurrentHashMap requestTable = + private ConcurrentMap requestTable = new ConcurrentHashMap(); private PriorityBlockingQueue requestQueue = new PriorityBlockingQueue(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 931edc76..4549f1ea 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore { // CommitLog private final CommitLog commitLog; - private final ConcurrentHashMap> consumeQueueTable; + private final ConcurrentMap> consumeQueueTable; private final FlushConsumeQueueService flushConsumeQueueService; @@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore { } public void truncateDirtyLogicFiles(long phyOffset) { - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { logic.truncateDirtyLogicFiles(phyOffset); } @@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore { } public void destroyLogics() { - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.destroy(); } @@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore { @Override public int cleanUnusedTopic(Set topics) { - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap queueTable = next.getValue(); + ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // @@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore { public void cleanExpiredConsumerQueue() { long minCommitLogOffset = this.commitLog.getMinOffset(); - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); String topic = next.getKey(); if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap queueTable = next.getValue(); + ConcurrentMap queueTable = next.getValue(); Iterator> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry nextQT = itQT.next(); @@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore { } public ConsumeQueue findConsumeQueue(String topic, int queueId) { - ConcurrentHashMap map = consumeQueueTable.get(topic); + ConcurrentMap map = consumeQueueTable.get(topic); if (null == map) { - ConcurrentHashMap newMap = new ConcurrentHashMap(128); - ConcurrentHashMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); + ConcurrentMap newMap = new ConcurrentHashMap(128); + ConcurrentMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { @@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore { private void checkSelf() { this.commitLog.checkSelf(); - Iterator>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry> next = it.next(); + Entry> next = it.next(); Iterator> itNext = next.getValue().entrySet().iterator(); while (itNext.hasNext()) { Entry cq = itNext.next(); @@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore { } private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { - ConcurrentHashMap map = this.consumeQueueTable.get(topic); + ConcurrentMap map = this.consumeQueueTable.get(topic); if (null == map) { map = new ConcurrentHashMap(); map.put(queueId, consumeQueue); @@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore { } private void recoverConsumeQueue() { - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); } @@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore { private void recoverTopicQueueTable() { HashMap table = new HashMap(1024); long minPhyOffset = this.commitLog.getMinOffset(); - for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { + for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); @@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore { return runningFlags; } - public ConcurrentHashMap> getConsumeQueueTable() { + public ConcurrentMap> getConsumeQueueTable() { return consumeQueueTable; } @@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore { @Override public ConsumeQueue getConsumeQueue(String topic, int queueId) { - ConcurrentHashMap map = consumeQueueTable.get(topic); + ConcurrentMap map = consumeQueueTable.get(topic); if (map == null) { return null; } @@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore { if (minOffset > this.lastPhysicalMinOffset) { this.lastPhysicalMinOffset = minOffset; - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { int deleteCount = logic.deleteExpiredFile(minOffset); @@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore { logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } - ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap maps : tables.values()) { + for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java index efb6aa81..7021992c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java @@ -17,17 +17,18 @@ package org.apache.rocketmq.store.schedule; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class DelayOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap offsetTable = + private ConcurrentMap offsetTable = new ConcurrentHashMap(32); - public ConcurrentHashMap getOffsetTable() { + public ConcurrentMap getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap offsetTable) { + public void setOffsetTable(ConcurrentMap offsetTable) { this.offsetTable = offsetTable; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 501876ed..172954de 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager { private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; - private final ConcurrentHashMap delayLevelTable = + private final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32); - private final ConcurrentHashMap offsetTable = + private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); private final Timer timer = new Timer("ScheduleMessageTimerThread", true); -- GitLab