From 167cce03480eb3abc05048ebbb023cbd6d243b62 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 29 Dec 2016 15:25:05 +0800 Subject: [PATCH] ROCKETMQ-18 Clean code closes apache/incubator-rocketmq#21 --- .../rocketmq/broker/BrokerController.java | 2 +- .../apache/rocketmq/broker/BrokerStartup.java | 10 ++-- .../broker/client/ConsumerGroupInfo.java | 4 +- .../broker/client/ConsumerManager.java | 2 +- .../broker/client/ProducerManager.java | 2 +- .../broker/client/net/Broker2Client.java | 2 +- .../rebalance/RebalanceLockManager.java | 4 +- .../broker/filtersrv/FilterServerManager.java | 2 +- .../broker/filtersrv/FilterServerUtil.java | 4 +- .../broker/latency/BrokerFastFailure.java | 1 - .../broker/longpolling/ManyPullRequest.java | 2 +- .../longpolling/PullRequestHoldService.java | 9 ++-- .../rocketmq/broker/out/BrokerOuterAPI.java | 6 +-- .../AbstractSendMessageProcessor.java | 8 ++-- .../processor/AdminBrokerProcessor.java | 10 ++-- .../processor/PullMessageProcessor.java | 46 +++++++++---------- .../processor/QueryMessageProcessor.java | 2 +- .../processor/SendMessageProcessor.java | 2 +- .../broker/slave/SlaveSynchronize.java | 18 ++++---- .../SubscriptionGroupManager.java | 8 ++-- .../broker/topic/TopicConfigManager.java | 19 ++++---- .../client/impl/ClientRemotingProcessor.java | 3 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 21 ++++----- .../rocketmq/client/impl/MQClientManager.java | 5 +- .../consumer/DefaultMQPullConsumerImpl.java | 10 ++-- .../consumer/DefaultMQPushConsumerImpl.java | 6 +-- .../client/impl/consumer/ProcessQueue.java | 6 +-- .../client/impl/factory/MQClientInstance.java | 21 ++++----- .../client/latency/MQFaultStrategy.java | 4 ++ .../org/apache/rocketmq/common/MixAll.java | 34 ++++++-------- .../org/apache/rocketmq/common/UtilAll.java | 15 +++--- .../rocketmq/common/filter/FilterAPI.java | 2 +- .../common/message/MessageDecoder.java | 20 +++----- .../common/protocol/MQProtosHelper.java | 11 +---- .../common/stats/MomentStatsItemSet.java | 2 +- .../rocketmq/common/stats/StatsItem.java | 6 +-- .../rocketmq/common/stats/StatsItemSet.java | 12 ++--- .../apache/rocketmq/common/MixAllTest.java | 5 +- .../rocketmq/common/RemotingUtilTest.java | 3 +- .../common/protocol/ConsumeStatusTest.java | 11 +++-- .../rocketmq/example/benchmark/Producer.java | 6 +-- .../benchmark/TransactionProducer.java | 4 +- .../example/ordermessage/Producer.java | 8 +--- .../transaction/TransactionProducer.java | 4 +- .../namesrv/routeinfo/RouteInfoManager.java | 6 +-- .../remoting/common/RemotingUtil.java | 4 +- .../store/AllocateMappedFileService.java | 2 +- .../org/apache/rocketmq/store/CommitLog.java | 6 +-- .../rocketmq/store/DefaultMessageStore.java | 6 +-- .../org/apache/rocketmq/store/StoreUtil.java | 2 +- .../rocketmq/store/index/IndexService.java | 4 +- .../broker/BrokerConsumeStatsSubCommad.java | 2 +- .../cluster/CLusterSendMsgRTCommand.java | 3 +- .../cluster/ClusterListSubCommand.java | 25 +++++----- .../DeleteSubscriptionGroupCommand.java | 1 + .../consumer/StartMonitoringSubCommand.java | 1 - .../message/PrintMessageByQueueCommand.java | 7 +-- .../message/PrintMessageSubCommand.java | 3 +- .../rocketmq/tools/command/message/Store.java | 2 - .../namesrv/DeleteKvConfigCommand.java | 1 - .../namesrv/GetNamesrvConfigCommand.java | 3 +- .../namesrv/UpdateKvConfigCommand.java | 1 - .../namesrv/UpdateNamesrvConfigCommand.java | 3 +- .../offset/CloneGroupOffsetCommand.java | 2 +- 64 files changed, 208 insertions(+), 258 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4fa3e213..9b89c85c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -497,7 +497,7 @@ public class BrokerController { long diff = this.messageStore.slaveFallBehindMuch(); // XXX: warn and notify me - log.info("slave fall behind master, how much, {} bytes", diff); + log.info("Slave fall behind master: {} bytes", diff); } public Broker2Client getBroker2Client() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index e5d0431f..5b15d795 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -148,10 +148,8 @@ public class BrokerStartup { if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - RemotingUtil.string2SocketAddress(addr); - } + for (String addr : addrArray) { + RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( @@ -211,13 +209,13 @@ public class BrokerStartup { @Override public void run() { synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long begineTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 7e9c4968..6ce542a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -72,7 +72,7 @@ public class ConsumerGroupInfo { } public List getAllChannel() { - List result = new ArrayList(); + List result = new ArrayList<>(); result.addAll(this.channelInfoTable.keySet()); @@ -80,7 +80,7 @@ public class ConsumerGroupInfo { } public List getAllClientId() { - List result = new ArrayList(); + List result = new ArrayList<>(); Iterator> it = this.channelInfoTable.entrySet().iterator(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 10d43b3d..a2d88d5d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -167,7 +167,7 @@ public class ConsumerManager { } public HashSet queryTopicConsumeByWho(final String topic) { - HashSet groups = new HashSet(); + HashSet groups = new HashSet<>(); Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index b4dc305e..010c1aef 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -133,7 +133,7 @@ public class ProducerManager { try { HashMap channelTable = this.groupChannelTable.get(group); if (null == channelTable) { - channelTable = new HashMap(); + channelTable = new HashMap<>(); this.groupChannelTable.put(group, channelTable); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 1fdf3db0..c00898c3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -230,7 +230,7 @@ public class Broker2Client { } private List convertOffsetTable2OffsetList(Map table) { - List list = new ArrayList(); + List list = new ArrayList<>(); for (Entry entry : table.entrySet()) { MessageQueue mq = entry.getKey(); MessageQueueForC tmp = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 426fcf21..98aceb63 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -42,7 +42,7 @@ public class RebalanceLockManager { try { ConcurrentHashMap groupValue = this.mqLockTable.get(group); if (null == groupValue) { - groupValue = new ConcurrentHashMap(32); + groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } @@ -132,7 +132,7 @@ public class RebalanceLockManager { try { ConcurrentHashMap groupValue = this.mqLockTable.get(group); if (null == groupValue) { - groupValue = new ConcurrentHashMap(32); + groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index 35b6dc49..b935bc8b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -137,7 +137,7 @@ public class FilterServerManager { } public List buildNewFilterServerList() { - List addr = new ArrayList(); + List addr = new ArrayList<>(); Iterator> it = this.filterServerTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java index 818b2384..5b142c11 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -26,9 +26,9 @@ public class FilterServerUtil { String[] cmdArray = splitShellString(shellString); process = Runtime.getRuntime().exec(cmdArray); process.waitFor(); - log.info("callShell: <{}> OK", shellString); + log.info("CallShell: <{}> OK", shellString); } catch (Throwable e) { - log.error("callShell: readLine IOException, " + shellString, e); + log.error("CallShell: readLine IOException, {}", shellString, e); } finally { if (null != process) process.destroy(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index c004d1b6..d7d12769 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -103,7 +103,6 @@ public class BrokerFastFailure { } } } - public void shutdown() { this.scheduledExecutorService.shutdown(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java index d7c6e6e6..d956c223 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; public class ManyPullRequest { - private final ArrayList pullRequestList = new ArrayList(); + private final ArrayList pullRequestList = new ArrayList<>(); public synchronized void addPullRequest(final PullRequest pullRequest) { this.pullRequestList.add(pullRequest); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 25796528..ff068d26 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -65,7 +65,7 @@ public class PullRequestHoldService extends ServiceThread { @Override public void run() { - log.info(this.getServiceName() + " service started"); + log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { @@ -85,7 +85,7 @@ public class PullRequestHoldService extends ServiceThread { } } - log.info(this.getServiceName() + " service end"); + log.info("{} service end", this.getServiceName()); } @Override @@ -96,7 +96,7 @@ public class PullRequestHoldService extends ServiceThread { private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); - if (kArray != null && 2 == kArray.length) { + if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); @@ -127,9 +127,8 @@ public class PullRequestHoldService extends ServiceThread { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); } - Long tmp = tagsCode; if (newestOffset > request.getPullFromThisOffset()) { - if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { + if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 3fc46053..8726c697 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -75,7 +75,7 @@ public class BrokerOuterAPI { String addrs = this.topAddressing.fetchNSAddr(); if (addrs != null) { if (!addrs.equals(this.nameSrvAddr)) { - log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs); + log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs); this.updateNameServerAddressList(addrs); this.nameSrvAddr = addrs; return nameSrvAddr; @@ -121,7 +121,7 @@ public class BrokerOuterAPI { log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { - log.warn("registerBroker Exception, " + namesrvAddr, e); + log.warn("registerBroker Exception, {}", namesrvAddr, e); } } } @@ -199,7 +199,7 @@ public class BrokerOuterAPI { this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); } catch (Exception e) { - log.warn("unregisterBroker Exception, " + namesrvAddr, e); + log.warn("unregisterBroker Exception, {}", namesrvAddr, e); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index fc6e132e..f5dc1f90 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -139,13 +139,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces final SendMessageRequestHeader requestHeader, RemotingCommand request, final RemotingCommand response) { if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); + log.warn("putMessage message topic length too long {}", requestHeader.getTopic().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; } if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long " - + requestHeader.getProperties().length()); + log.warn("putMessage message properties length too long {}", requestHeader.getProperties().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; } @@ -188,8 +187,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } } - log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " - + ctx.channel().remoteAddress()); + log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// requestHeader.getTopic(), // requestHeader.getDefaultTopic(), // diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 286ecbe4..8bf48acc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -267,7 +267,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No topic in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No topic in this broker"); return response; @@ -290,7 +290,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); Properties properties = MixAll.string2Properties(bodyStr); if (properties != null) { - log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress()); + log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); this.brokerController.getConfiguration().update(properties); if (properties.containsKey("brokerPermission")) { this.brokerController.registerBrokerAll(false, false); @@ -476,7 +476,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No subscription group in this broker, client:{} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No subscription group in this broker"); return response; @@ -718,7 +718,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; @@ -745,7 +745,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No delay offset in this broker"); return response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index be1199ac..382030be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -88,12 +88,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setOpaque(request.getOpaque()); if (LOG.isDebugEnabled()) { - LOG.debug("receive PullMessage request command, " + request); + LOG.debug("receive PullMessage request command, {}", request); } if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); + response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } @@ -101,8 +101,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST))); return response; } @@ -120,10 +119,9 @@ public class PullMessageProcessor implements NettyRequestProcessor { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { - LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); + LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark( - "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } @@ -134,8 +132,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { } if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { - String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() - + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); + String errorInfo = String.format("queueId[%d] is illagal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); LOG.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); @@ -148,8 +146,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getSubscription()); } catch (Exception e) { - LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // - requestHeader.getConsumerGroup()); + LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); return response; @@ -158,7 +156,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { - LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); + LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; @@ -173,15 +171,15 @@ public class PullMessageProcessor implements NettyRequestProcessor { subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { - LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); + LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { - LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), - subscriptionData.getSubString()); + LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), + subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest"); return response; @@ -261,15 +259,14 @@ public class PullMessageProcessor implements NettyRequestProcessor { case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " - + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); + LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); - LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", + LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break; @@ -346,12 +343,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { - LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause()); + LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause()); } } }); } catch (Throwable e) { - LOG.error("transfer many message by pagecache exception", e); + LOG.error("Error occurred when transferring messages from page cache", e); getMessageResult.release(); } @@ -480,7 +477,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } catch (Exception e) { - LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); + LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e); } } @@ -499,21 +496,20 @@ public class PullMessageProcessor implements NettyRequestProcessor { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", - future.cause()); + LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); LOG.error(request.toString()); LOG.error(response.toString()); } } }); } catch (Throwable e) { - LOG.error("processRequestWrapper process request over, but response failed", e); + LOG.error("ProcessRequestWrapper process request over, but response failed", e); LOG.error(request.toString()); LOG.error(response.toString()); } } } catch (RemotingCommandException e1) { - LOG.error("executeRequestWhenWakeup run", e1); + LOG.error("ExecuteRequestWhenWakeup run", e1); } } }; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index 6a20774e..e8f97d0a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -146,7 +146,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { - log.error("transfer one message by page cache failed, ", future.cause()); + log.error("Transfer one message from page cache failed, ", future.cause()); } } }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b1d24dba..a4404621 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -252,7 +252,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); if (log.isDebugEnabled()) { - log.debug("receive SendMessage request command, " + request); + log.debug("receive SendMessage request command, {}", request); } final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index ecf84243..44c8264f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -68,10 +68,10 @@ public class SlaveSynchronize { .putAll(topicWrapper.getTopicConfigTable()); this.brokerController.getTopicConfigManager().persist(); - log.info("update slave topic config from master, {}", masterAddrBak); + log.info("Update slave topic config from master, {}", masterAddrBak); } } catch (Exception e) { - log.error("syncTopicConfig Exception, " + masterAddrBak, e); + log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); } } } @@ -85,9 +85,9 @@ public class SlaveSynchronize { this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); - log.info("update slave consumer offset from master, {}", masterAddrBak); + log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { - log.error("syncConsumerOffset Exception, " + masterAddrBak, e); + log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } } @@ -106,12 +106,12 @@ public class SlaveSynchronize { try { MixAll.string2File(delayOffset, fileName); } catch (IOException e) { - log.error("persist file Exception, " + fileName, e); + log.error("Persist file Exception, {}", fileName, e); } } - log.info("update slave delay offset from master, {}", masterAddrBak); + log.info("Update slave delay offset from master, {}", masterAddrBak); } catch (Exception e) { - log.error("syncDelayOffset Exception, " + masterAddrBak, e); + log.error("SyncDelayOffset Exception, {}", masterAddrBak, e); } } } @@ -134,10 +134,10 @@ public class SlaveSynchronize { subscriptionGroupManager.getSubscriptionGroupTable().putAll( subscriptionWrapper.getSubscriptionGroupTable()); subscriptionGroupManager.persist(); - log.info("update slave Subscription Group from master, {}", masterAddrBak); + log.info("Update slave Subscription Group from master, {}", masterAddrBak); } } catch (Exception e) { - log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e); + log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index b6613399..4b6072c9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -99,9 +99,9 @@ public class SubscriptionGroupManager extends ConfigManager { public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); if (old != null) { - log.info("update subscription group config, old: " + old + " new: " + config); + log.info("update subscription group config, old: {} new: {}", old, config); } else { - log.info("create new subscription group, " + config); + log.info("create new subscription group, {}", config); } this.dataVersion.nextVersion(); @@ -181,11 +181,11 @@ public class SubscriptionGroupManager extends ConfigManager { public void deleteSubscriptionGroupConfig(final String groupName) { SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); if (old != null) { - log.info("delete subscription group OK, subscription group: " + old); + log.info("delete subscription group OK, subscription group:{}", old); this.dataVersion.nextVersion(); this.persist(); } else { - log.warn("delete subscription group failed, subscription group: " + old + " not exist"); + log.warn("delete subscription group failed, subscription group: {} not exist", old); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index df2231d1..d31ad4b8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -181,18 +181,15 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " - + remoteAddress); + LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]", + defaultTopic, defaultTopicConfig.getPerm(), remoteAddress); } } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] not exist." + " producer: " + remoteAddress); + LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress); } if (topicConfig != null) { - LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig - + " producer: " + remoteAddress); + LOG.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress); this.topicConfigTable.put(topic, topicConfig); @@ -307,9 +304,9 @@ public class TopicConfigManager extends ConfigManager { public void updateTopicConfig(final TopicConfig topicConfig) { TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); if (old != null) { - LOG.info("update topic config, old: " + old + " new: " + topicConfig); + LOG.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { - LOG.info("create new topic, " + topicConfig); + LOG.info("create new topic [{}]", topicConfig); } this.dataVersion.nextVersion(); @@ -362,11 +359,11 @@ public class TopicConfigManager extends ConfigManager { public void deleteTopicConfig(final String topic) { TopicConfig old = this.topicConfigTable.remove(topic); if (old != null) { - LOG.info("delete topic config OK, topic: " + old); + LOG.info("Delete topic config OK, topic:{}", old); this.dataVersion.nextVersion(); this.persist(); } else { - LOG.warn("delete topic config failed, topic: " + topic + " not exist"); + LOG.warn("Delete topic config failed, topic:{} not exist", topic); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index ebf0734e..2aadc896 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -125,9 +125,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", - new Object[] { RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp()}); + requestHeader.getTimestamp()); Map offsetTable = new HashMap(); if (request.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 3bacd5df..12580c14 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -212,13 +212,11 @@ public class MQClientAPIImpl { public void updateNameServerAddressList(final String addrs) { List lst = new ArrayList(); String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); + for (String addr : addrArray) { + lst.add(addr); } + + this.remotingClient.updateNameServerAddressList(lst); } public void start() { @@ -468,7 +466,7 @@ public class MQClientAPIImpl { } try { sendCallback.onException(e); - } catch (Exception e2) { + } catch (Exception ignored) { } } } @@ -1074,8 +1072,7 @@ public class MQClientAPIImpl { request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { - ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); - return consumerConnection; + return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); } default: break; @@ -1151,8 +1148,7 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class); - return responseBody; + return ClusterInfo.decode(response.getBody(), ClusterInfo.class); } default: break; @@ -1226,8 +1222,7 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { - TopicList topicList = TopicList.decode(body, TopicList.class); - return topicList; + return TopicList.decode(body, TopicList.class); } } default: diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index ee55d92e..6f2c9a38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -20,9 +20,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; public class MQClientManager { + private final static Logger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentHashMap factoryTable = @@ -51,7 +54,7 @@ public class MQClientManager { if (prev != null) { instance = prev; } else { - // TODO log + log.warn("Previous MQClientInstance has created for clientId:[{}]", clientId); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index bbc705e4..b26d062b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -227,7 +227,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); - } catch (Exception e) { + } catch (Exception ignore) { } } } @@ -246,7 +246,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageBefore(context); - } catch (Throwable e) { + } catch (Throwable ignored) { } } } @@ -257,7 +257,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageAfter(context); - } catch (Throwable e) { + } catch (Throwable ignored) { } } } @@ -314,9 +314,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.makeSureStateOK(); Set mqs = new HashSet(); Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } + mqs.addAll(allocateMq); this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 1a39998b..4f33732d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -921,7 +921,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public void doRebalance() { - if (this.rebalanceImpl != null && !this.pause) { + if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } } @@ -932,9 +932,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.makeSureStateOK(); Set mqs = new HashSet(); Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } + mqs.addAll(allocateMq); this.offsetStore.persistAll(mqs); } catch (Exception e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 93d4cf97..38b80738 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -58,13 +58,11 @@ public class ProcessQueue { private volatile long msgAccCnt = 0; public boolean isLockExpired() { - boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; - return result; + return (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; } public boolean isPullExpired() { - boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; - return result; + return (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c1944d87..1343e763 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -472,7 +472,7 @@ public class MQClientInstance { final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { - log.warn("sending hearbeat, but no consumer and no producer"); + log.warn("sending heartbeat, but no consumer and no producer"); return; } @@ -841,13 +841,8 @@ public class MQClientInstance { if (addr != null) { try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); - log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, - consumerGroup, brokerName, entry1.getKey(), addr); - } catch (RemotingException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (MQBrokerException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (InterruptedException e) { + log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); + } catch (RemotingException | InterruptedException | MQBrokerException e) { log.error("unregister client exception from broker: " + addr, e); } } @@ -1064,7 +1059,9 @@ public class MQClientInstance { } } } finally { - consumer.resume(); + if (consumer != null) { + consumer.resume(); + } } } @@ -1134,14 +1131,14 @@ public class MQClientInstance { List nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList(); - StringBuffer strBuffer = new StringBuffer(); + StringBuilder strBuilder = new StringBuilder(); if (nsList != null) { for (String addr : nsList) { - strBuffer.append(addr + ";"); + strBuilder.append(addr).append(";"); } } - String nsAddr = strBuffer.toString(); + String nsAddr = strBuilder.toString(); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name()); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION, diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index 206b0a3d..235aa20a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -18,9 +18,12 @@ package org.apache.rocketmq.client.latency; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; public class MQFaultStrategy { + private final static Logger log = ClientLogger.getLog(); private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; @@ -80,6 +83,7 @@ public class MQFaultStrategy { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { + log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index e9e19b6e..22ed96ac 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -137,7 +137,7 @@ public class MixAll { return Math.abs(value); } - public static final void string2File(final String str, final String fileName) throws IOException { + public static void string2File(final String str, final String fileName) throws IOException { String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); @@ -155,7 +155,8 @@ public class MixAll { file.renameTo(new File(fileName)); } - public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { + + public static void string2FileNotSafe(final String str, final String fileName) throws IOException { File file = new File(fileName); File fileParent = file.getParentFile(); if (fileParent != null) { @@ -170,21 +171,17 @@ public class MixAll { throw e; } finally { if (fileWriter != null) { - try { - fileWriter.close(); - } catch (IOException e) { - throw e; - } + fileWriter.close(); } } } - public static final String file2String(final String fileName) { + public static String file2String(final String fileName) { File file = new File(fileName); return file2String(file); } - public static final String file2String(final File file) { + public static String file2String(final File file) { if (file.exists()) { char[] data = new char[(int) file.length()]; boolean result = false; @@ -213,7 +210,7 @@ public class MixAll { return null; } - public static final String file2String(final URL url) { + public static String file2String(final URL url) { InputStream in = null; try { URLConnection urlConnection = url.openConnection(); @@ -223,12 +220,12 @@ public class MixAll { byte[] data = new byte[len]; in.read(data, 0, len); return new String(data, "UTF-8"); - } catch (Exception e) { + } catch (Exception ignored) { } finally { if (null != in) { try { in.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } } @@ -258,9 +255,7 @@ public class MixAll { if (null == value) { value = ""; } - } catch (IllegalArgumentException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { + } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } @@ -273,7 +268,6 @@ public class MixAll { if (log != null) { log.info(name + "=" + value); - } else { } } } @@ -318,9 +312,7 @@ public class MixAll { try { field.setAccessible(true); value = field.get(object); - } catch (IllegalArgumentException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { + } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } @@ -365,10 +357,10 @@ public class MixAll { } else { continue; } - method.invoke(object, new Object[] {arg}); + method.invoke(object, arg); } } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 5a81b1b4..56015b36 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -79,7 +79,7 @@ public class UtilAll { public static boolean isItTimeToDo(final String when) { String[] whiles = when.split(";"); - if (whiles != null && whiles.length > 0) { + if (whiles.length > 0) { Calendar now = Calendar.getInstance(); for (String w : whiles) { int nowHour = Integer.parseInt(w); @@ -186,6 +186,7 @@ public class UtilAll { if (!file.exists()) { boolean result = file.mkdirs(); if (!result) { + //TO DO } } @@ -202,7 +203,8 @@ public class UtilAll { return -1; } - public static final int crc32(byte[] array) { + + public static int crc32(byte[] array) { if (array != null) { return crc32(array, 0, array.length); } @@ -210,7 +212,8 @@ public class UtilAll { return 0; } - public static final int crc32(byte[] array, int offset, int length) { + + public static int crc32(byte[] array, int offset, int length) { CRC32 crc32 = new CRC32(); crc32.update(array, offset, length); return (int) (crc32.getValue() & 0x7FFFFFFF); @@ -267,15 +270,15 @@ public class UtilAll { } finally { try { byteArrayInputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } try { inflaterInputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } try { byteArrayOutputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java index 2097bfa9..e9bf3fa2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java @@ -46,7 +46,7 @@ public class FilterAPI { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); - if (tags != null && tags.length > 0) { + if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index efa952ef..4f4e1589 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -16,19 +16,19 @@ */ package org.apache.rocketmq.common.message; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; public class MessageDecoder { public final static int MSG_ID_LENGTH = 8 + 8; @@ -318,10 +318,6 @@ public class MessageDecoder { } return msgExt; - } catch (UnknownHostException e) { - byteBuffer.position(byteBuffer.limit()); - } catch (BufferUnderflowException e) { - byteBuffer.position(byteBuffer.limit()); } catch (Exception e) { byteBuffer.position(byteBuffer.limit()); } @@ -366,12 +362,10 @@ public class MessageDecoder { Map map = new HashMap(); if (properties != null) { String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); - if (items != null) { - for (String i : items) { - String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); - if (nv != null && 2 == nv.length) { - map.put(nv[0], nv[1]); - } + for (String i : items) { + String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); + if (2 == nv.length) { + map.put(nv[0], nv[1]); } } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java index 7b875d07..bff73331 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java @@ -19,9 +19,6 @@ package org.apache.rocketmq.common.protocol; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class MQProtosHelper { @@ -38,13 +35,7 @@ public class MQProtosHelper { if (response != null) { return ResponseCode.SUCCESS == response.getCode(); } - } catch (RemotingConnectException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { - e.printStackTrace(); - } catch (InterruptedException e) { + } catch (Exception e) { e.printStackTrace(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 22dac953..5498d34c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -54,7 +54,7 @@ public class MomentStatsItemSet { public void run() { try { printAtMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index fd9bc4c9..9b37f800 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -94,7 +94,7 @@ public class StatsItem { public void run() { try { samplingInSeconds(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.SECONDS); @@ -104,7 +104,7 @@ public class StatsItem { public void run() { try { samplingInMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.MINUTES); @@ -114,7 +114,7 @@ public class StatsItem { public void run() { try { samplingInHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 1, TimeUnit.HOURS); diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 33c65aec..8633d682 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -47,7 +47,7 @@ public class StatsItemSet { public void run() { try { samplingInSeconds(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.SECONDS); @@ -57,7 +57,7 @@ public class StatsItemSet { public void run() { try { samplingInMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.MINUTES); @@ -67,7 +67,7 @@ public class StatsItemSet { public void run() { try { samplingInHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 1, TimeUnit.HOURS); @@ -77,7 +77,7 @@ public class StatsItemSet { public void run() { try { printAtMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); @@ -87,7 +87,7 @@ public class StatsItemSet { public void run() { try { printAtHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); @@ -97,7 +97,7 @@ public class StatsItemSet { public void run() { try { printAtDay(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 693718e5..f5c4fada 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -17,10 +17,11 @@ package org.apache.rocketmq.common; +import org.junit.Test; +import org.junit.Assert; + import java.net.InetAddress; import java.util.List; -import junit.framework.Assert; -import org.junit.Test; public class MixAllTest { diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java index e1e82d8d..2c9a2fb2 100644 --- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java @@ -17,12 +17,13 @@ package org.apache.rocketmq.common; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.junit.Assert; import org.junit.Test; public class RemotingUtilTest { @Test public void test() throws Exception { String a = RemotingUtil.getLocalAddress(); - System.out.println(a); + Assert.assertTrue(a.length() > 0); } } diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java index d39a53af..e738ed6a 100644 --- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java @@ -19,17 +19,22 @@ package org.apache.rocketmq.common.protocol; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.junit.Assert; import org.junit.Test; public class ConsumeStatusTest { @Test - public void decode_test() throws Exception { + public void decodeTest() throws Exception { ConsumeStatus cs = new ConsumeStatus(); - cs.setConsumeFailedTPS(0L); + cs.setConsumeFailedTPS(10); + cs.setPullRT(100); + cs.setPullTPS(1000); String json = RemotingSerializable.toJson(cs, true); - System.out.println(json); ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class); + Assert.assertEquals(fromJson.getPullRT(), cs.getPullRT(), 0.0001); + Assert.assertEquals(fromJson.getPullTPS(), cs.getPullTPS(), 0.0001); + Assert.assertEquals(fromJson.getConsumeFailedTPS(), cs.getConsumeFailedTPS(), 0.0001); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 65c9bf23..50d750df 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -49,7 +49,7 @@ public class Producer { final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; - final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false; + final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k')); System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); @@ -140,7 +140,7 @@ public class Producer { try { Thread.sleep(3000); - } catch (InterruptedException e1) { + } catch (InterruptedException ignored) { } } catch (InterruptedException e) { statsBenchmark.getSendRequestFailedCount().incrementAndGet(); @@ -156,7 +156,7 @@ public class Producer { log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { Thread.sleep(3000); - } catch (InterruptedException e1) { + } catch (InterruptedException ignored) { } } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index f28bffe4..d9fafdd0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -43,8 +43,8 @@ public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; - ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false; - ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false; + ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]); + ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]); final Message msg = buildMessage(messageSize); diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java index 52b76530..6a6bdc7d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -54,13 +54,7 @@ public class Producer { } producer.shutdown(); - } catch (MQClientException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); - } catch (MQBrokerException e) { - e.printStackTrace(); - } catch (InterruptedException e) { + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index 78335768..edfad248 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -45,9 +45,7 @@ public class TransactionProducer { System.out.printf("%s%n", sendResult); Thread.sleep(10); - } catch (MQClientException e) { - e.printStackTrace(); - } catch (UnsupportedEncodingException e) { + } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 34462600..69b64ca7 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -279,12 +279,10 @@ public class RouteInfoManager { try { this.lock.writeLock().lockInterruptibly(); BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); - if (brokerLiveInfo != null) { - log.info("unregisterBroker, remove from brokerLiveTable {}, {}", + log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo != null ? "OK" : "Failed", brokerAddr - ); - } + ); this.filterServerTable.remove(brokerAddr); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index 2de554dd..f64f9e15 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -44,11 +44,11 @@ public class RemotingUtil { private static boolean isWindowsPlatform = false; static { - if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) { + if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) { isLinuxPlatform = true; } - if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("windows") >= 0) { + if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) { isWindowsPlatform = true; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index b4850d07..0993a5fa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread { requestQueue.offer(req); try { Thread.sleep(1); - } catch (InterruptedException e1) { + } catch (InterruptedException ignored) { } } } finally { diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 51bba68f..5ebab545 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1150,7 +1150,7 @@ public class CommitLog { final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); @@ -1158,7 +1158,7 @@ public class CommitLog { } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); - final int topicLength = topicData == null ? 0 : topicData.length; + final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; @@ -1229,7 +1229,7 @@ public class CommitLog { this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES - this.msgStoreItemMemory.putShort(propertiesLength); + this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index dc6812d1..3a43c21b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -666,7 +666,7 @@ public class DefaultMessageStore implements MessageStore { final int size = result.getByteBuffer().getInt(); long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); return storeTime; - } catch (Exception e) { + } catch (Exception ignored) { } finally { result.release(); } @@ -1491,7 +1491,7 @@ public class DefaultMessageStore implements MessageStore { if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { Thread.sleep(deleteLogicsFilesInterval); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } } @@ -1592,7 +1592,7 @@ public class DefaultMessageStore implements MessageStore { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { try { Thread.sleep(100); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java index afcf1c64..f63efd6e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java @@ -24,7 +24,7 @@ public class StoreUtil { @SuppressWarnings("restriction") public static long getTotalPhysicalMemorySize() { - long physicalTotal = 1024 * 1024 * 1024 * 24; + long physicalTotal = 1024 * 1024 * 1024 * 24L; OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean(); if (osmxb instanceof com.sun.management.OperatingSystemMXBean) { physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize(); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index d7d53ef3..1ebf52a6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -74,10 +74,10 @@ public class IndexService { log.info("load index file OK, " + f.getFileName()); this.indexFileList.add(f); } catch (IOException e) { - log.error("load file " + file + " error", e); + log.error("load file {} error", file, e); return false; } catch (NumberFormatException e) { - continue; + log.error("load file {} error", file, e); } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java index 86f86eff..bcd4c9cd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java @@ -114,7 +114,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand { String lastTime = "-"; try { lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS); - } catch (Exception e) { + } catch (Exception ignored) { } if (offsetWrapper.getLastTimestamp() > 0) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java index 0649e719..72aad508 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java @@ -106,8 +106,7 @@ public class CLusterSendMsgRTCommand implements SubCommand { long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine .getOptionValue('i').trim()); - boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean - .parseBoolean(commandLine.getOptionValue('p').trim()); + boolean printAsTlog = commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine .getOptionValue('m').trim(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index d70bb1d8..bd79bc7f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -224,18 +224,19 @@ public class ClusterListSubCommand implements SubCommand { version = kvTable.getTable().get("brokerVersionDesc"); { String[] tpss = putTps.split(" "); - if (tpss != null && tpss.length > 0) { + if (tpss.length > 0) { in = Double.parseDouble(tpss[0]); } } { String[] tpss = getTransferedTps.split(" "); - if (tpss != null && tpss.length > 0) { + if (tpss.length > 0) { out = Double.parseDouble(tpss[0]); } } } catch (Exception e) { + e.printStackTrace(); } double hour = 0.0; @@ -251,16 +252,16 @@ public class ClusterListSubCommand implements SubCommand { } System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", - clusterName, - brokerName, - next1.getKey().longValue(), - next1.getValue(), - version, - String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), - String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), - pageCacheLockTimeMills, - String.format("%2.2f", hour), - String.format("%.4f", space) + clusterName, + brokerName, + next1.getKey(), + next1.getValue(), + version, + String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), + String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), + pageCacheLockTimeMills, + String.format("%2.2f", hour), + String.format("%.4f", space) ); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java index 3f96c61c..35735a1b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java @@ -91,6 +91,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX + groupName); } catch (Exception e) { + e.printStackTrace(); } return; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java index 547eca7f..373c8953 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java @@ -53,7 +53,6 @@ public class StartMonitoringSubCommand implements SubCommand { monitorService.start(); } catch (Exception e) { e.printStackTrace(); - } finally { } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index 0171ec8c..adeb138a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -93,6 +93,7 @@ public class PrintMessageByQueueCommand implements SubCommand { System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); } catch (UnsupportedEncodingException e) { + e.printStackTrace(); } } } @@ -160,11 +161,11 @@ public class PrintMessageByQueueCommand implements SubCommand { String charsetName = !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); boolean printMsg = - !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); + commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); boolean printBody = - !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); boolean calByTag = - !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); + commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); String subExpression = !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index 23a31b5f..591d27ea 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -113,8 +113,7 @@ public class PrintMessageSubCommand implements SubCommand { String subExpression = // !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); - boolean printBody = // - !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + boolean printBody = !commandLine.hasOption('d') || Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); consumer.start(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java index e8e17742..6db78137 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java @@ -133,7 +133,6 @@ public class Store { // 5 FLAG int flag = byteBuffer.getInt(); - flag = flag + 0; // 6 QUEUEOFFSET long queueOffset = byteBuffer.getLong(); @@ -146,7 +145,6 @@ public class Store { // 9 BORNTIMESTAMP long bornTimeStamp = byteBuffer.getLong(); - bornTimeStamp = bornTimeStamp + 0; // 10 BORNHOST(IP+PORT) byteBuffer.position(byteBuffer.position() + 8); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java index eb0629a3..a8ac6a45 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java @@ -59,7 +59,6 @@ public class DeleteKvConfigCommand implements SubCommand { defaultMQAdminExt.start(); defaultMQAdminExt.deleteKvConfig(namespace, key); System.out.printf("delete kv config from namespace success.%n"); - return; } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java index 29e5f923..c36e3334 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java @@ -55,7 +55,7 @@ public class GetNamesrvConfigCommand implements SubCommand { if (servers != null && servers.length() > 0) { String[] serverArray = servers.trim().split(";"); - if (serverArray != null && serverArray.length > 0) { + if (serverArray.length > 0) { serverList = Arrays.asList(serverArray); } } @@ -71,7 +71,6 @@ public class GetNamesrvConfigCommand implements SubCommand { System.out.printf("%-50s= %s\n", key, nameServerConfigs.get(server).get(key)); } } - return; } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java index 976fc4c5..254eaf31 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java @@ -65,7 +65,6 @@ public class UpdateKvConfigCommand implements SubCommand { defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value); System.out.printf("create or update kv config to namespace success.%n"); - return; } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java index be389d73..0b33ca22 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java @@ -69,7 +69,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand { if (servers != null && servers.length() > 0) { String[] serverArray = servers.trim().split(";"); - if (serverArray != null && serverArray.length > 0) { + if (serverArray.length > 0) { serverList = Arrays.asList(serverArray); } } @@ -80,7 +80,6 @@ public class UpdateNamesrvConfigCommand implements SubCommand { System.out.printf("update name server config success!%s\n%s : %s\n", serverList == null ? "" : serverList, key, value); - return; } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java index 065fa68a..2bc1fd61 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java @@ -76,7 +76,7 @@ public class CloneGroupOffsetCommand implements SubCommand { defaultMQAdminExt.start(); ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(srcGroup); Set mqs = consumeStats.getOffsetTable().keySet(); - if (mqs != null && !mqs.isEmpty()) { + if (!mqs.isEmpty()) { TopicRouteData topicRoute = defaultMQAdminExt.examineTopicRouteInfo(topic); for (MessageQueue mq : mqs) { String addr = null; -- GitLab