提交 96cd2e4e 编写于 作者: J Jaskey 提交者: dongeforever

[ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when...

[ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when running client closes apache/incubator-rocketmq#10
上级 2c28baad
...@@ -23,6 +23,7 @@ import java.util.List; ...@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
...@@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory; ...@@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory;
public class ConsumerGroupInfo { public class ConsumerGroupInfo {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName; private final String groupName;
private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>(); new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16); new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
private volatile ConsumeType consumeType; private volatile ConsumeType consumeType;
private volatile MessageModel messageModel; private volatile MessageModel messageModel;
...@@ -63,11 +64,11 @@ public class ConsumerGroupInfo { ...@@ -63,11 +64,11 @@ public class ConsumerGroupInfo {
return null; return null;
} }
public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
return subscriptionTable; return subscriptionTable;
} }
public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
return channelInfoTable; return channelInfoTable;
} }
......
...@@ -22,6 +22,7 @@ import java.util.Iterator; ...@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
...@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory; ...@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
public class ConsumerManager { public class ConsumerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener; private final ConsumerIdsChangeListener consumerIdsChangeListener;
...@@ -145,7 +146,7 @@ public class ConsumerManager { ...@@ -145,7 +146,7 @@ public class ConsumerManager {
Entry<String, ConsumerGroupInfo> next = it.next(); Entry<String, ConsumerGroupInfo> next = it.next();
String group = next.getKey(); String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue(); ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable(); consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
...@@ -176,7 +177,7 @@ public class ConsumerManager { ...@@ -176,7 +177,7 @@ public class ConsumerManager {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next(); Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = ConcurrentMap<String, SubscriptionData> subscriptionTable =
entry.getValue().getSubscriptionTable(); entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) { if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey()); groups.add(entry.getKey());
......
...@@ -25,7 +25,7 @@ import java.util.HashMap; ...@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
...@@ -189,7 +189,7 @@ public class Broker2Client { ...@@ -189,7 +189,7 @@ public class Broker2Client {
this.brokerController.getConsumerManager().getConsumerGroupInfo(group); this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable(); consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion(); int version = entry.getValue().getVersion();
...@@ -252,7 +252,7 @@ public class Broker2Client { ...@@ -252,7 +252,7 @@ public class Broker2Client {
Map<String, Map<MessageQueue, Long>> consumerStatusTable = Map<String, Map<MessageQueue, Long>> consumerStatusTable =
new HashMap<String, Map<MessageQueue, Long>>(); new HashMap<String, Map<MessageQueue, Long>>();
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
if (null == channelInfoTable || channelInfoTable.isEmpty()) { if (null == channelInfoTable || channelInfoTable.isEmpty()) {
result.setCode(ResponseCode.SYSTEM_ERROR); result.setCode(ResponseCode.SYSTEM_ERROR);
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -31,7 +32,7 @@ public class RebalanceLockManager { ...@@ -31,7 +32,7 @@ public class RebalanceLockManager {
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.filter; package org.apache.rocketmq.broker.filter;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
...@@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager { ...@@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager {
private static final long MS_24_HOUR = 24 * 3600 * 1000; private static final long MS_24_HOUR = 24 * 3600 * 1000;
private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic> private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256); filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256);
private transient BrokerController brokerController; private transient BrokerController brokerController;
...@@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager { ...@@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager {
} }
} }
public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() { public ConcurrentMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
return filterDataByTopic; return filterDataByTopic;
} }
...@@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager { ...@@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager {
public static class FilterDataMapByTopic { public static class FilterDataMapByTopic {
private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData> private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>(); groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
private String topic; private String topic;
...@@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager { ...@@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager {
return this.groupFilterData.get(consumerGroup); return this.groupFilterData.get(consumerGroup);
} }
public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() { public final ConcurrentMap<String, ConsumerFilterData> getGroupFilterData() {
return this.groupFilterData; return this.groupFilterData;
} }
......
...@@ -23,6 +23,7 @@ import java.util.Iterator; ...@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -38,7 +39,7 @@ public class FilterServerManager { ...@@ -38,7 +39,7 @@ public class FilterServerManager {
public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
new ConcurrentHashMap<Channel, FilterServerInfo>(16); new ConcurrentHashMap<Channel, FilterServerInfo>(16);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -20,6 +20,7 @@ import java.util.ArrayList; ...@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.SystemClock;
...@@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread { ...@@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread {
private static final String TOPIC_QUEUEID_SEPARATOR = "@"; private static final String TOPIC_QUEUEID_SEPARATOR = "@";
private final BrokerController brokerController; private final BrokerController brokerController;
private final SystemClock systemClock = new SystemClock(); private final SystemClock systemClock = new SystemClock();
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024); new ConcurrentHashMap<String, ManyPullRequest>(1024);
public PullRequestHoldService(final BrokerController brokerController) { public PullRequestHoldService(final BrokerController brokerController) {
......
...@@ -23,6 +23,7 @@ import java.util.Map; ...@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
...@@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@"; private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
private transient BrokerController brokerController; private transient BrokerController brokerController;
...@@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager {
} }
public void scanUnsubscribedTopic() { public void scanUnsubscribedTopic() {
Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) { if (arrays.length == 2) {
...@@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager {
} }
} }
private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) {
Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
boolean result = !table.isEmpty(); boolean result = !table.isEmpty();
...@@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager {
public Set<String> whichTopicByConsumer(final String group) { public Set<String> whichTopicByConsumer(final String group) {
Set<String> topics = new HashSet<String>(); Set<String> topics = new HashSet<String>();
Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) { if (arrays.length == 2) {
...@@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager {
public Set<String> whichGroupByTopic(final String topic) { public Set<String> whichGroupByTopic(final String topic) {
Set<String> groups = new HashSet<String>(); Set<String> groups = new HashSet<String>();
Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) { if (arrays.length == 2) {
...@@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager {
} }
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) { if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32); map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset); map.put(queueId, offset);
...@@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager {
public long queryOffset(final String group, final String topic, final int queueId) { public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group // topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group; String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) { if (null != map) {
Long offset = map.get(queueId); Long offset = map.get(queueId);
if (offset != null) if (offset != null)
...@@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager {
return RemotingSerializable.toJson(this, prettyFormat); return RemotingSerializable.toJson(this, prettyFormat);
} }
public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
return offsetTable; return offsetTable;
} }
public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
this.offsetTable = offsetTable; this.offsetTable = offsetTable;
} }
...@@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager {
} }
} }
for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
String topicGroup = offSetEntry.getKey(); String topicGroup = offSetEntry.getKey();
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
if (topic.equals(topicGroupArr[0])) { if (topic.equals(topicGroupArr[0])) {
...@@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager {
} }
public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
if (offsets != null) { if (offsets != null) {
this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets)); this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
} }
......
...@@ -29,7 +29,7 @@ import java.util.List; ...@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
...@@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
GetConsumeStatsInBrokerHeader requestHeader = GetConsumeStatsInBrokerHeader requestHeader =
(GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
boolean isOrder = requestHeader.isOrder(); boolean isOrder = requestHeader.isOrder();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups = ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups =
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList = List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
...@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory; ...@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class SubscriptionGroupManager extends ConfigManager { public class SubscriptionGroupManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
private final DataVersion dataVersion = new DataVersion(); private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController; private transient BrokerController brokerController;
...@@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager { ...@@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager {
} }
} }
public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable; return subscriptionGroupTable;
} }
......
...@@ -22,6 +22,7 @@ import java.util.Map; ...@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
...@@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager { ...@@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager {
private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock(); private transient final Lock lockTopicConfigTable = new ReentrantLock();
private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = private final ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024); new ConcurrentHashMap<String, TopicConfig>(1024);
private final DataVersion dataVersion = new DataVersion(); private final DataVersion dataVersion = new DataVersion();
private final Set<String> systemTopicList = new HashSet<String>(); private final Set<String> systemTopicList = new HashSet<String>();
...@@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager { ...@@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager {
return dataVersion; return dataVersion;
} }
public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
return topicConfigTable; return topicConfigTable;
} }
} }
...@@ -20,6 +20,7 @@ import java.util.Iterator; ...@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -35,11 +36,11 @@ import org.slf4j.Logger; ...@@ -35,11 +36,11 @@ import org.slf4j.Logger;
public class MQPullConsumerScheduleService { public class MQPullConsumerScheduleService {
private final Logger log = ClientLogger.getLog(); private final Logger log = ClientLogger.getLog();
private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable = private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
private DefaultMQPullConsumer defaultMQPullConsumer; private DefaultMQPullConsumer defaultMQPullConsumer;
private int pullThreadNums = 20; private int pullThreadNums = 20;
private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable = private ConcurrentMap<String /* topic */, PullTaskCallback> callbackTable =
new ConcurrentHashMap<String, PullTaskCallback>(); new ConcurrentHashMap<String, PullTaskCallback>();
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
...@@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService { ...@@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService {
} }
} }
public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() { public ConcurrentMap<String, PullTaskCallback> getCallbackTable() {
return callbackTable; return callbackTable;
} }
......
...@@ -22,6 +22,7 @@ import java.util.HashMap; ...@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore { ...@@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore {
private final MQClientInstance mQClientFactory; private final MQClientInstance mQClientFactory;
private final String groupName; private final String groupName;
private final String storePath; private final String storePath;
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(); new ConcurrentHashMap<MessageQueue, AtomicLong>();
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer.store; package org.apache.rocketmq.client.consumer.store;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
...@@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; ...@@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
* Wrapper class for offset serialization * Wrapper class for offset serialization
*/ */
public class OffsetSerializeWrapper extends RemotingSerializable { public class OffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(); new ConcurrentHashMap<MessageQueue, AtomicLong>();
public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
return offsetTable; return offsetTable;
} }
public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
this.offsetTable = offsetTable; this.offsetTable = offsetTable;
} }
} }
...@@ -21,6 +21,7 @@ import java.util.HashSet; ...@@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private final static Logger log = ClientLogger.getLog(); private final static Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory; private final MQClientInstance mQClientFactory;
private final String groupName; private final String groupName;
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(); new ConcurrentHashMap<MessageQueue, AtomicLong>();
public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl; package org.apache.rocketmq.client.impl;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
...@@ -28,7 +29,7 @@ public class MQClientManager { ...@@ -28,7 +29,7 @@ public class MQClientManager {
private final static Logger log = ClientLogger.getLog(); private final static Logger log = ClientLogger.getLog();
private static MQClientManager instance = new MQClientManager(); private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>(); new ConcurrentHashMap<String, MQClientInstance>();
private MQClientManager() { private MQClientManager() {
......
...@@ -22,7 +22,7 @@ import java.util.List; ...@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
...@@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
throw new IllegalArgumentException("topic is null"); throw new IllegalArgumentException("topic is null");
} }
ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable(); ConcurrentMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
Set<MessageQueue> mqResult = new HashSet<MessageQueue>(); Set<MessageQueue> mqResult = new HashSet<MessageQueue>();
for (MessageQueue mq : mqTable.keySet()) { for (MessageQueue mq : mqTable.keySet()) {
if (mq.getTopic().equals(topic)) { if (mq.getTopic().equals(topic)) {
......
...@@ -26,7 +26,7 @@ import java.util.Map; ...@@ -26,7 +26,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
...@@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
} }
public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return this.rebalanceImpl.getSubscriptionInner(); return this.rebalanceImpl.getSubscriptionInner();
} }
...@@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long computeAccumulationTotal() { private long computeAccumulationTotal() {
long msgAccTotal = 0; long msgAccTotal = 0;
ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator(); Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next(); Entry<MessageQueue, ProcessQueue> next = it.next();
......
...@@ -17,13 +17,14 @@ ...@@ -17,13 +17,14 @@
package org.apache.rocketmq.client.impl.consumer; package org.apache.rocketmq.client.impl.consumer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
/** /**
* Message lock,strictly ensure the single queue only one thread at a time consuming * Message lock,strictly ensure the single queue only one thread at a time consuming
*/ */
public class MessageQueueLock { public class MessageQueueLock {
private ConcurrentHashMap<MessageQueue, Object> mqLockTable = private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>(); new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) { public Object fetchLockObject(final MessageQueue mq) {
......
...@@ -21,6 +21,7 @@ import java.util.ArrayList; ...@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
...@@ -53,7 +54,7 @@ public class PullAPIWrapper { ...@@ -53,7 +54,7 @@ public class PullAPIWrapper {
private final MQClientInstance mQClientFactory; private final MQClientInstance mQClientFactory;
private final String consumerGroup; private final String consumerGroup;
private final boolean unitMode; private final boolean unitMode;
private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32); new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
private volatile boolean connectBrokerByUser = false; private volatile boolean connectBrokerByUser = false;
private volatile long defaultBrokerId = MixAll.MASTER_ID; private volatile long defaultBrokerId = MixAll.MASTER_ID;
...@@ -247,7 +248,7 @@ public class PullAPIWrapper { ...@@ -247,7 +248,7 @@ public class PullAPIWrapper {
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
throws MQClientException { throws MQClientException {
ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
if (topicRouteTable != null) { if (topicRouteTable != null) {
TopicRouteData topicRouteData = topicRouteTable.get(topic); TopicRouteData topicRouteData = topicRouteTable.get(topic);
List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
......
...@@ -26,6 +26,7 @@ import java.util.Map; ...@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
...@@ -44,10 +45,10 @@ import org.slf4j.Logger; ...@@ -44,10 +45,10 @@ import org.slf4j.Logger;
*/ */
public abstract class RebalanceImpl { public abstract class RebalanceImpl {
protected static final Logger log = ClientLogger.getLog(); protected static final Logger log = ClientLogger.getLog();
protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>(); new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>(); new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup; protected String consumerGroup;
protected MessageModel messageModel; protected MessageModel messageModel;
...@@ -232,7 +233,7 @@ public abstract class RebalanceImpl { ...@@ -232,7 +233,7 @@ public abstract class RebalanceImpl {
this.truncateMessageQueueNotMyTopic(); this.truncateMessageQueueNotMyTopic();
} }
public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner; return subscriptionInner;
} }
...@@ -421,11 +422,11 @@ public abstract class RebalanceImpl { ...@@ -421,11 +422,11 @@ public abstract class RebalanceImpl {
} }
} }
public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() { public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
return processQueueTable; return processQueueTable;
} }
public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
return topicSubscribeInfoTable; return topicSubscribeInfoTable;
} }
......
...@@ -28,6 +28,7 @@ import java.util.Map.Entry; ...@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -88,18 +89,18 @@ public class MQClientInstance { ...@@ -88,18 +89,18 @@ public class MQClientInstance {
private final int instanceIndex; private final int instanceIndex;
private final String clientId; private final String clientId;
private final long bootTimestamp = System.currentTimeMillis(); private final long bootTimestamp = System.currentTimeMillis();
private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
private final NettyClientConfig nettyClientConfig; private final NettyClientConfig nettyClientConfig;
private final MQClientAPIImpl mQClientAPIImpl; private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl; private final MQAdminImpl mQAdminImpl;
private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>(); new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>(); new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override @Override
...@@ -1088,7 +1089,7 @@ public class MQClientInstance { ...@@ -1088,7 +1089,7 @@ public class MQClientInstance {
} }
consumer.suspend(); consumer.suspend();
ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey(); MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
...@@ -1166,7 +1167,7 @@ public class MQClientInstance { ...@@ -1166,7 +1167,7 @@ public class MQClientInstance {
return defaultMQProducer; return defaultMQProducer;
} }
public ConcurrentHashMap<String, TopicRouteData> getTopicRouteTable() { public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {
return topicRouteTable; return topicRouteTable;
} }
......
...@@ -26,6 +26,7 @@ import java.util.Random; ...@@ -26,6 +26,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog(); private final Logger log = ClientLogger.getLog();
private final Random random = new Random(); private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer; private final DefaultMQProducer defaultMQProducer;
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>(); new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook; private final RPCHook rpcHook;
...@@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
} }
public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() { public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable; return topicPublishInfoTable;
} }
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
...@@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; ...@@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class ConsumerConnection extends RemotingSerializable { public class ConsumerConnection extends RemotingSerializable {
private HashSet<Connection> connectionSet = new HashSet<Connection>(); private HashSet<Connection> connectionSet = new HashSet<Connection>();
private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = private ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>(); new ConcurrentHashMap<String, SubscriptionData>();
private ConsumeType consumeType; private ConsumeType consumeType;
private MessageModel messageModel; private MessageModel messageModel;
...@@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable { ...@@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable {
this.connectionSet = connectionSet; this.connectionSet = connectionSet;
} }
public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
return subscriptionTable; return subscriptionTable;
} }
......
...@@ -18,17 +18,18 @@ ...@@ -18,17 +18,18 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
return offsetTable; return offsetTable;
} }
public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { public void setOffsetTable(ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
this.offsetTable = offsetTable; this.offsetTable = offsetTable;
} }
} }
...@@ -18,21 +18,22 @@ ...@@ -18,21 +18,22 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class SubscriptionGroupWrapper extends RemotingSerializable { public class SubscriptionGroupWrapper extends RemotingSerializable {
private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
private DataVersion dataVersion = new DataVersion(); private DataVersion dataVersion = new DataVersion();
public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable; return subscriptionGroupTable;
} }
public void setSubscriptionGroupTable( public void setSubscriptionGroupTable(
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
this.subscriptionGroupTable = subscriptionGroupTable; this.subscriptionGroupTable = subscriptionGroupTable;
} }
......
...@@ -18,20 +18,21 @@ ...@@ -18,20 +18,21 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicConfigSerializeWrapper extends RemotingSerializable { public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<String, TopicConfig> topicConfigTable = private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(); new ConcurrentHashMap<String, TopicConfig>();
private DataVersion dataVersion = new DataVersion(); private DataVersion dataVersion = new DataVersion();
public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
return topicConfigTable; return topicConfigTable;
} }
public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) { public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
this.topicConfigTable = topicConfigTable; this.topicConfigTable = topicConfigTable;
} }
......
...@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; ...@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger; import org.slf4j.Logger;
public class MomentStatsItemSet { public class MomentStatsItemSet {
private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable = private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable =
new ConcurrentHashMap<String, MomentStatsItem>(128); new ConcurrentHashMap<String, MomentStatsItem>(128);
private final String statsName; private final String statsName;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
...@@ -39,7 +40,7 @@ public class MomentStatsItemSet { ...@@ -39,7 +40,7 @@ public class MomentStatsItemSet {
this.init(); this.init();
} }
public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() { public ConcurrentMap<String, MomentStatsItem> getStatsItemTable() {
return statsItemTable; return statsItemTable;
} }
......
...@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; ...@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger; import org.slf4j.Logger;
public class StatsItemSet { public class StatsItemSet {
private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
new ConcurrentHashMap<String, StatsItem>(128); new ConcurrentHashMap<String, StatsItem>(128);
private final String statsName; private final String statsName;
......
...@@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter; ...@@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -40,7 +41,7 @@ public class FilterClassManager { ...@@ -40,7 +41,7 @@ public class FilterClassManager {
private final ScheduledExecutorService scheduledExecutorService = Executors private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = private ConcurrentMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
new ConcurrentHashMap<String, FilterClassInfo>(128); new ConcurrentHashMap<String, FilterClassInfo>(128);
private FilterClassFetchMethod filterClassFetchMethod; private FilterClassFetchMethod filterClassFetchMethod;
......
...@@ -25,7 +25,7 @@ import java.util.List; ...@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
...@@ -135,7 +135,7 @@ public class RouteInfoManager { ...@@ -135,7 +135,7 @@ public class RouteInfoManager {
&& MixAll.MASTER_ID == brokerId) { && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst) { || registerFirst) {
ConcurrentHashMap<String, TopicConfig> tcTable = ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable(); topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) { if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
......
...@@ -27,6 +27,7 @@ import java.util.LinkedList; ...@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
...@@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract { ...@@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract {
/** /**
* This map caches all on-going requests. * This map caches all on-going requests.
*/ */
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256); new ConcurrentHashMap<Integer, ResponseFuture>(256);
/** /**
......
...@@ -41,6 +41,7 @@ import java.util.Random; ...@@ -41,6 +41,7 @@ import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupWorker;
private final Lock lockChannelTables = new ReentrantLock(); private final Lock lockChannelTables = new ReentrantLock();
private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
private final Timer timer = new Timer("ClientHouseKeepingService", true); private final Timer timer = new Timer("ClientHouseKeepingService", true);
......
...@@ -20,6 +20,7 @@ import java.io.File; ...@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; ...@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class AllocateMappedFileService extends ServiceThread { public class AllocateMappedFileService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5; private static int waitTimeOut = 1000 * 5;
private ConcurrentHashMap<String, AllocateRequest> requestTable = private ConcurrentMap<String, AllocateRequest> requestTable =
new ConcurrentHashMap<String, AllocateRequest>(); new ConcurrentHashMap<String, AllocateRequest>();
private PriorityBlockingQueue<AllocateRequest> requestQueue = private PriorityBlockingQueue<AllocateRequest> requestQueue =
new PriorityBlockingQueue<AllocateRequest>(); new PriorityBlockingQueue<AllocateRequest>();
......
...@@ -28,6 +28,7 @@ import java.util.Map; ...@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore {
// CommitLog // CommitLog
private final CommitLog commitLog; private final CommitLog commitLog;
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
private final FlushConsumeQueueService flushConsumeQueueService; private final FlushConsumeQueueService flushConsumeQueueService;
...@@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore { ...@@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore {
} }
public void truncateDirtyLogicFiles(long phyOffset) { public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
logic.truncateDirtyLogicFiles(phyOffset); logic.truncateDirtyLogicFiles(phyOffset);
} }
...@@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore {
} }
public void destroyLogics() { public void destroyLogics() {
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
logic.destroy(); logic.destroy();
} }
...@@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore { ...@@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore {
@Override @Override
public int cleanUnusedTopic(Set<String> topics) { public int cleanUnusedTopic(Set<String> topics) {
Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
String topic = next.getKey(); String topic = next.getKey();
if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
for (ConsumeQueue cq : queueTable.values()) { for (ConsumeQueue cq : queueTable.values()) {
cq.destroy(); cq.destroy();
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
...@@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore { ...@@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore {
public void cleanExpiredConsumerQueue() { public void cleanExpiredConsumerQueue() {
long minCommitLogOffset = this.commitLog.getMinOffset(); long minCommitLogOffset = this.commitLog.getMinOffset();
Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
String topic = next.getKey(); String topic = next.getKey();
if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
while (itQT.hasNext()) { while (itQT.hasNext()) {
Entry<Integer, ConsumeQueue> nextQT = itQT.next(); Entry<Integer, ConsumeQueue> nextQT = itQT.next();
...@@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore {
} }
public ConsumeQueue findConsumeQueue(String topic, int queueId) { public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) { if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) { if (oldMap != null) {
map = oldMap; map = oldMap;
} else { } else {
...@@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore {
private void checkSelf() { private void checkSelf() {
this.commitLog.checkSelf(); this.commitLog.checkSelf();
Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator(); Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator();
while (itNext.hasNext()) { while (itNext.hasNext()) {
Entry<Integer, ConsumeQueue> cq = itNext.next(); Entry<Integer, ConsumeQueue> cq = itNext.next();
...@@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore {
} }
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
if (null == map) { if (null == map) {
map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>(); map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
map.put(queueId, consumeQueue); map.put(queueId, consumeQueue);
...@@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore {
} }
private void recoverConsumeQueue() { private void recoverConsumeQueue() {
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
logic.recover(); logic.recover();
} }
...@@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore {
private void recoverTopicQueueTable() { private void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset(); long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId(); String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue()); table.put(key, logic.getMaxOffsetInQueue());
...@@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore {
return runningFlags; return runningFlags;
} }
public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() { public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
return consumeQueueTable; return consumeQueueTable;
} }
...@@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore {
@Override @Override
public ConsumeQueue getConsumeQueue(String topic, int queueId) { public ConsumeQueue getConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (map == null) { if (map == null) {
return null; return null;
} }
...@@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore {
if (minOffset > this.lastPhysicalMinOffset) { if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset; this.lastPhysicalMinOffset = minOffset;
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
int deleteCount = logic.deleteExpiredFile(minOffset); int deleteCount = logic.deleteExpiredFile(minOffset);
...@@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore {
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
} }
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue cq : maps.values()) { for (ConsumeQueue cq : maps.values()) {
boolean result = false; boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) { for (int i = 0; i < retryTimes && !result; i++) {
......
...@@ -17,17 +17,18 @@ ...@@ -17,17 +17,18 @@
package org.apache.rocketmq.store.schedule; package org.apache.rocketmq.store.schedule;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class DelayOffsetSerializeWrapper extends RemotingSerializable { public class DelayOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32); new ConcurrentHashMap<Integer, Long>(32);
public ConcurrentHashMap<Integer, Long> getOffsetTable() { public ConcurrentMap<Integer, Long> getOffsetTable() {
return offsetTable; return offsetTable;
} }
public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) { public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
this.offsetTable = offsetTable; this.offsetTable = offsetTable;
} }
} }
...@@ -23,6 +23,7 @@ import java.util.Map.Entry; ...@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager { ...@@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager {
private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L; private static final long DELAY_FOR_A_PERIOD = 10000L;
private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32); new ConcurrentHashMap<Integer, Long>(32);
private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32); new ConcurrentHashMap<Integer, Long>(32);
private final Timer timer = new Timer("ScheduleMessageTimerThread", true); private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册