diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 8b01ef56a8edaf6ae6fbcd85643e2d92424c7ddd..de7f3fce81c7a70a7e9b0eeb1daa6a2f9dd68c4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -146,7 +146,7 @@ public class BrokerOuterAPI { @Override public void run() { try { - RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); + RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 86f606593f3db18fce0c3f470c807e22f3ba1516..99ba1da3dd3b024f92e9fd0a1bec7021339662ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -45,7 +45,7 @@ public class TopicConfigManager extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; - private transient final Lock lockTopicConfigTable = new ReentrantLock(); + private transient final Lock topicConfigTableLock = new ReentrantLock(); private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); @@ -159,7 +159,7 @@ public class TopicConfigManager extends ConfigManager { boolean createNew = false; try { - if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null) @@ -213,7 +213,7 @@ public class TopicConfigManager extends ConfigManager { this.persist(); } } finally { - this.lockTopicConfigTable.unlock(); + this.topicConfigTableLock.unlock(); } } } catch (InterruptedException e) { @@ -239,7 +239,7 @@ public class TopicConfigManager extends ConfigManager { boolean createNew = false; try { - if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null) @@ -257,7 +257,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); } finally { - this.lockTopicConfigTable.unlock(); + this.topicConfigTableLock.unlock(); } } } catch (InterruptedException e) { @@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager { boolean createNew = false; try { - if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) @@ -297,7 +297,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); } finally { - this.lockTopicConfigTable.unlock(); + this.topicConfigTableLock.unlock(); } } } catch (InterruptedException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index a17109818151f3620fbeb616b7cb1ab6102dba0f..130effad9e55f08ede275f6870ba9976f11f5f56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -478,7 +478,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { - this.processQueue.getLockConsume().lock(); + this.processQueue.getConsumeLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); @@ -494,7 +494,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { messageQueue); hasException = true; } finally { - this.processQueue.getLockConsume().unlock(); + this.processQueue.getConsumeLock().unlock(); } if (null == status diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 4b9ea62c336743a5af35dbd85dfd31cb9c59fc19..21798d8c6ec03d6bb82d5cf32b5c84327be4dc42 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -44,11 +44,11 @@ public class ProcessQueue { public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000")); private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); private final InternalLogger log = ClientLogger.getLog(); - private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); + private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock(); private final TreeMap msgTreeMap = new TreeMap(); private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); - private final Lock lockConsume = new ReentrantLock(); + private final Lock consumeLock = new ReentrantLock(); /** * A subset of msgTreeMap, will only be used when orderly consume */ @@ -83,7 +83,7 @@ public class ProcessQueue { for (int i = 0; i < loop; i++) { MessageExt msg = null; try { - this.lockTreeMap.readLock().lockInterruptibly(); + this.treeMapLock.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); @@ -92,7 +92,7 @@ public class ProcessQueue { break; } } finally { - this.lockTreeMap.readLock().unlock(); + this.treeMapLock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); @@ -103,7 +103,7 @@ public class ProcessQueue { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { @@ -113,7 +113,7 @@ public class ProcessQueue { } } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); @@ -127,7 +127,7 @@ public class ProcessQueue { public boolean putMessage(final List msgs) { boolean dispatchToConsume = false; try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { int validMsgCnt = 0; for (MessageExt msg : msgs) { @@ -156,7 +156,7 @@ public class ProcessQueue { } } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); @@ -167,13 +167,13 @@ public class ProcessQueue { public long getMaxSpan() { try { - this.lockTreeMap.readLock().lockInterruptibly(); + this.treeMapLock.readLock().lockInterruptibly(); try { if (!this.msgTreeMap.isEmpty()) { return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey(); } } finally { - this.lockTreeMap.readLock().unlock(); + this.treeMapLock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getMaxSpan exception", e); @@ -186,7 +186,7 @@ public class ProcessQueue { long result = -1; final long now = System.currentTimeMillis(); try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { @@ -206,7 +206,7 @@ public class ProcessQueue { } } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); @@ -245,12 +245,12 @@ public class ProcessQueue { public void rollback() { try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap); this.consumingMsgOrderlyTreeMap.clear(); } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("rollback exception", e); @@ -259,7 +259,7 @@ public class ProcessQueue { public long commit() { try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); @@ -271,7 +271,7 @@ public class ProcessQueue { return offset + 1; } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("commit exception", e); @@ -282,14 +282,14 @@ public class ProcessQueue { public void makeMessageToConsumeAgain(List msgs) { try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { for (MessageExt msg : msgs) { this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset()); this.msgTreeMap.put(msg.getQueueOffset(), msg); } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("makeMessageToCosumeAgain exception", e); @@ -300,7 +300,7 @@ public class ProcessQueue { List result = new ArrayList(batchSize); final long now = System.currentTimeMillis(); try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!this.msgTreeMap.isEmpty()) { @@ -319,7 +319,7 @@ public class ProcessQueue { consuming = false; } } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("take Messages exception", e); @@ -330,11 +330,11 @@ public class ProcessQueue { public boolean hasTempMessage() { try { - this.lockTreeMap.readLock().lockInterruptibly(); + this.treeMapLock.readLock().lockInterruptibly(); try { return !this.msgTreeMap.isEmpty(); } finally { - this.lockTreeMap.readLock().unlock(); + this.treeMapLock.readLock().unlock(); } } catch (InterruptedException e) { } @@ -344,7 +344,7 @@ public class ProcessQueue { public void clear() { try { - this.lockTreeMap.writeLock().lockInterruptibly(); + this.treeMapLock.writeLock().lockInterruptibly(); try { this.msgTreeMap.clear(); this.consumingMsgOrderlyTreeMap.clear(); @@ -352,7 +352,7 @@ public class ProcessQueue { this.msgSize.set(0); this.queueOffsetMax = 0L; } finally { - this.lockTreeMap.writeLock().unlock(); + this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("rollback exception", e); @@ -367,8 +367,8 @@ public class ProcessQueue { this.lastLockTimestamp = lastLockTimestamp; } - public Lock getLockConsume() { - return lockConsume; + public Lock getConsumeLock() { + return consumeLock; } public long getLastPullTimestamp() { @@ -397,7 +397,7 @@ public class ProcessQueue { public void fillProcessQueueInfo(final ProcessQueueInfo info) { try { - this.lockTreeMap.readLock().lockInterruptibly(); + this.treeMapLock.readLock().lockInterruptibly(); if (!this.msgTreeMap.isEmpty()) { info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); @@ -421,7 +421,7 @@ public class ProcessQueue { info.setLastConsumeTimestamp(this.lastConsumeTimestamp); } catch (Exception e) { } finally { - this.lockTreeMap.readLock().unlock(); + this.treeMapLock.readLock().unlock(); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index e5166f35b59c003d4512f6f2e64965713ea8d051..9582391622cf6d2c0555e6e0b4d75ca9b9fef48c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -88,11 +88,11 @@ public class RebalancePushImpl extends RebalanceImpl { if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { - if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { + if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { - pq.getLockConsume().unlock(); + pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index d836c8e0bfba9865215144279ba3ecba040d2b0d..5ba6cfab0a740a46a20d69a104b74094ad81993f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -84,7 +84,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final AtomicReference> namesrvAddrList = new AtomicReference>(); private final AtomicReference namesrvAddrChoosed = new AtomicReference(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); - private final Lock lockNamesrvChannel = new ReentrantLock(); + private final Lock namesrvChannelLock = new ReentrantLock(); private final ExecutorService publicExecutor; @@ -418,7 +418,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } final List addrList = this.namesrvAddrList.get(); - if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { @@ -445,7 +445,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti throw new RemotingConnectException(addrList.toString()); } } finally { - this.lockNamesrvChannel.unlock(); + this.namesrvChannelLock.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);