From 7f96008c8b6f3ce5ac38cd168bd12252799973e3 Mon Sep 17 00:00:00 2001 From: yukon Date: Fri, 11 Aug 2017 20:28:13 +0800 Subject: [PATCH] Remove unused class GetRouteInfoResponseHeader and meaningless comments --- .../rocketmq/broker/BrokerController.java | 11 +- .../apache/rocketmq/broker/BrokerStartup.java | 8 +- .../rebalance/RebalanceLockManager.java | 44 ++-- .../broker/filtersrv/FilterServerManager.java | 2 - .../AbstractSendMessageProcessor.java | 8 +- .../processor/AdminBrokerProcessor.java | 49 ++-- .../processor/PullMessageProcessor.java | 16 +- .../processor/SendMessageProcessor.java | 28 +- .../rocketmq/broker/BrokerControllerTest.java | 8 +- .../consumer/store/LocalFileOffsetStore.java | 10 +- .../store/RemoteBrokerOffsetStore.java | 4 +- .../rocketmq/client/impl/MQAdminImpl.java | 2 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 247 +++++++++--------- .../ConsumeMessageConcurrentlyService.java | 66 +++-- .../ConsumeMessageOrderlyService.java | 86 +++--- .../impl/consumer/ConsumeMessageService.java | 8 +- .../consumer/DefaultMQPullConsumerImpl.java | 104 ++++---- .../consumer/DefaultMQPushConsumerImpl.java | 62 ++--- .../client/impl/consumer/RebalanceImpl.java | 22 +- .../impl/consumer/RebalancePushImpl.java | 4 +- .../client/impl/factory/MQClientInstance.java | 27 +- .../impl/producer/DefaultMQProducerImpl.java | 92 +++---- .../client/impl/producer/MQProducerInner.java | 6 +- .../apache/rocketmq/common/TopicConfig.java | 10 - .../apache/rocketmq/common/help/FAQUrl.java | 26 +- .../common/message/MessageClientIDSetter.java | 2 +- .../protocol/body/ConsumerRunningInfo.java | 94 +++---- .../header/GetConsumeStatsRequestHeader.java | 2 - .../GetConsumerStatusRequestHeader.java | 1 - .../GetEarliestMsgStoretimeRequestHeader.java | 1 - .../header/QueryCorrectionOffsetHeader.java | 2 +- .../header/SearchOffsetRequestHeader.java | 2 +- .../header/UnregisterClientRequestHeader.java | 2 +- .../UnregisterClientResponseHeader.java | 2 +- .../namesrv/GetRouteInfoResponseHeader.java | 33 --- .../RegisterOrderTopicRequestHeader.java | 2 +- .../rocketmq/common/sysflag/TopicSysFlag.java | 4 - .../rocketmq/common/utils/IOTinyUtils.java | 2 - .../rocketmq/example/simple/PushConsumer.java | 2 - .../src/main/resources/MessageFilterImpl.java | 2 +- .../filter/parser/SelectorParser.java | 1 - .../rocketmq/filter/parser/SelectorParser.jj | 1 - .../namesrv/kvconfig/KVConfigManager.java | 6 +- .../namesrv/routeinfo/RouteInfoManager.java | 4 +- .../rocketmq/remoting/netty/NettyDecoder.java | 2 +- .../remoting/netty/NettyRemotingAbstract.java | 24 +- .../remoting/netty/NettyRemotingClient.java | 8 +- .../remoting/netty/NettyServerConfig.java | 4 - .../remoting/netty/NettySystemConfig.java | 18 +- .../org/apache/rocketmq/store/CommitLog.java | 101 +++---- .../rocketmq/store/DefaultMessageStore.java | 54 ++-- .../rocketmq/store/DispatchRequest.java | 18 -- .../org/apache/rocketmq/store/MappedFile.java | 2 - .../rocketmq/store/MappedFileQueue.java | 1 - .../rocketmq/store/ha/HAConnection.java | 8 - .../apache/rocketmq/store/ha/HAService.java | 18 -- .../rocketmq/store/index/IndexFile.java | 1 - .../rocketmq/store/index/IndexHeader.java | 4 - .../schedule/ScheduleMessageService.java | 3 - .../rocketmq/store/stats/BrokerStats.java | 2 - .../tools/admin/DefaultMQAdminExtImpl.java | 10 +- .../message/PrintMessageSubCommand.java | 5 +- 62 files changed, 617 insertions(+), 781 deletions(-) delete mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c8624c4f..cd68552b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -135,11 +135,11 @@ public class BrokerController { private BrokerFastFailure brokerFastFailure; private Configuration configuration; - public BrokerController(// - final BrokerConfig brokerConfig, // - final NettyServerConfig nettyServerConfig, // - final NettyClientConfig nettyClientConfig, // - final MessageStoreConfig messageStoreConfig // + public BrokerController( + final BrokerConfig brokerConfig, + final NettyServerConfig nettyServerConfig, + final NettyClientConfig nettyClientConfig, + final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; @@ -255,7 +255,6 @@ public class BrokerController { this.registerProcessor(); - // TODO remove in future final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 85d2e3af..e0a3b699 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -190,10 +190,10 @@ public class BrokerStartup { MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); - final BrokerController controller = new BrokerController(// - brokerConfig, // - nettyServerConfig, // - nettyClientConfig, // + final BrokerController controller = new BrokerController( + brokerConfig, + nettyServerConfig, + nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index ed5a8758..519745e0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -52,9 +52,9 @@ public class RebalanceLockManager { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); - log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // + log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", + group, + clientId, mq); } @@ -69,19 +69,19 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); return true; } log.warn( - "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); return false; } finally { @@ -144,9 +144,9 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( - "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // + "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", + group, + clientId, mq); } @@ -162,20 +162,20 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); lockedMqs.add(mq); continue; } log.warn( - "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); } } finally { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index 52cb9199..ff63127f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -111,9 +111,7 @@ public class FilterServerManager { } } - /** - */ public void scanNotActiveChannel() { Iterator> it = this.filterServerTable.entrySet().iterator(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 3faa7ae3..410192f3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -189,10 +189,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// - requestHeader.getTopic(), // - requestHeader.getDefaultTopic(), // - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( + requestHeader.getTopic(), + requestHeader.getDefaultTopic(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 71fdda93..937f5756 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -116,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -432,9 +433,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); - Set lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // + Set lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( + requestBody.getConsumerGroup(), + requestBody.getMqSet(), requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); @@ -450,9 +451,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); - this.brokerController.getRebalanceLockManager().unlockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // + this.brokerController.getRebalanceLockManager().unlockBatch( + requestBody.getConsumerGroup(), + requestBody.getMqSet(), requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); @@ -657,14 +658,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - /** - */ { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - if (null == findSubscriptionData // + if (null == findSubscriptionData && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); continue; @@ -683,9 +682,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (brokerOffset < 0) brokerOffset = 0; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - requestHeader.getConsumerGroup(), // - topic, // + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + requestHeader.getConsumerGroup(), + topic, i); if (consumerOffset < 0) consumerOffset = 0; @@ -925,9 +924,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - /** - */ private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); @@ -1007,9 +1004,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - /** - */ if (!requestHeader.isOffline()) { SubscriptionData findSubscriptionData = @@ -1107,13 +1102,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (isOrder && !topicConfig.isOrder()) { continue; } - /** - */ { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); - if (null == findSubscriptionData // + if (null == findSubscriptionData && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); continue; @@ -1129,9 +1122,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (brokerOffset < 0) brokerOffset = 0; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - group, // - topic, // + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + group, + topic, i); if (consumerOffset < 0) consumerOffset = 0; @@ -1215,10 +1208,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return runtimeInfo; } - private RemotingCommand callConsumer(// - final int requestCode, // - final RemotingCommand request, // - final String consumerGroup, // + private RemotingCommand callConsumer( + final int requestCode, + final RemotingCommand request, + final String consumerGroup, final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); @@ -1231,8 +1224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", // - clientId, // + response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", + clientId, MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index fb7ea203..fe2fcfe3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -160,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { assert consumerFilterData != null; } } catch (Exception e) { - log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); @@ -176,7 +176,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { return response; } - if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // + if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); @@ -285,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // - requestHeader.getQueueOffset(), // - getMessageResult.getNextBeginOffset(), // - requestHeader.getTopic(), // - requestHeader.getQueueId(), // - requestHeader.getConsumerGroup()// + log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", + requestHeader.getQueueOffset(), + getMessageResult.getNextBeginOffset(), + requestHeader.getTopic(), + requestHeader.getQueueId(), + requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 5c716cc8..cd60c442 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -139,9 +139,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// - newTopic, // - subscriptionGroupConfig.getRetryQueueNums(), // + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + newTopic, + subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -175,13 +175,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, + DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { @@ -268,8 +268,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, + DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); @@ -289,9 +289,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return true; } - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, + final RemotingCommand request, + final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); @@ -464,9 +464,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, + final RemotingCommand request, + final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index fe30d8f2..d4edd9a7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -38,10 +38,10 @@ public class BrokerControllerTest { @Test public void testBrokerRestart() throws Exception { for (int i = 0; i < 2; i++) { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // + BrokerController brokerController = new BrokerController( + new BrokerConfig(), + new NettyServerConfig(), + new NettyClientConfig(), new MessageStoreConfig()); assertThat(brokerController.initialize()); brokerController.start(); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index d4b19b23..22ec6742 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -52,9 +52,9 @@ public class LocalFileOffsetStore implements OffsetStore { public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { this.mQClientFactory = mQClientFactory; this.groupName = groupName; - this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + // - this.mQClientFactory.getClientId() + File.separator + // - this.groupName + File.separator + // + this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + + this.mQClientFactory.getClientId() + File.separator + + this.groupName + File.separator + "offsets.json"; } @@ -217,8 +217,8 @@ public class LocalFileOffsetStore implements OffsetStore { OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { log.warn("readLocalOffset Exception", e); - throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" // - + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // + throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" + + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), e); } return offsetSerializeWrapper; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 5bd5749e..b82e9928 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -204,7 +204,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } @@ -232,7 +232,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 983e5157..92d8513f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -159,7 +159,7 @@ public class MQAdminImpl { } } catch (Exception e) { throw new MQClientException( - "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), // + "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index ae9ed6c3..c5abc36b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -285,32 +285,32 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + public SendResult sendMessage( + final String addr, + final String brokerName, + final Message msg, + final SendMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendCallback sendCallback, // 7 - final TopicPublishInfo topicPublishInfo, // 8 - final MQClientInstance instance, // 9 - final int retryTimesWhenSendFailed, // 10 - final SendMessageContext context, // 11 - final DefaultMQProducerImpl producer // 12 + public SendResult sendMessage( + final String addr, + final String brokerName, + final Message msg, + final SendMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int retryTimesWhenSendFailed, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { @@ -341,31 +341,31 @@ public class MQClientAPIImpl { return null; } - private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + private SendResult sendMessageSync( + final String addr, + final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); } - private void sendMessageAsync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int retryTimesWhenSendFailed, // - final AtomicInteger times, // - final SendMessageContext context, // - final DefaultMQProducerImpl producer // + private void sendMessageAsync( + final String addr, + final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int retryTimesWhenSendFailed, + final AtomicInteger times, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -380,7 +380,6 @@ public class MQClientAPIImpl { context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { - // } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); @@ -428,19 +427,19 @@ public class MQClientAPIImpl { }); } - private void onExceptionImpl(final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int timesTotal, // - final AtomicInteger curTimes, // - final Exception e, // - final SendMessageContext context, // - final boolean needRetry, // - final DefaultMQProducerImpl producer // 12 + private void onExceptionImpl(final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int timesTotal, + final AtomicInteger curTimes, + final Exception e, + final SendMessageContext context, + final boolean needRetry, + final DefaultMQProducerImpl producer ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { @@ -485,16 +484,15 @@ public class MQClientAPIImpl { } } - private SendResult processSendResponse(// - final String brokerName, // - final Message msg, // - final RemotingCommand response// + private SendResult processSendResponse( + final String brokerName, + final Message msg, + final RemotingCommand response ) throws MQBrokerException, RemotingCommandException { switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: case ResponseCode.FLUSH_SLAVE_TIMEOUT: case ResponseCode.SLAVE_NOT_AVAILABLE: { - // TODO LOG } case ResponseCode.SUCCESS: { SendStatus sendStatus = SendStatus.SEND_OK; @@ -553,12 +551,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public PullResult pullMessage(// - final String addr, // - final PullMessageRequestHeader requestHeader, // - final long timeoutMillis, // - final CommunicationMode communicationMode, // - final PullCallback pullCallback// + public PullResult pullMessage( + final String addr, + final PullMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); @@ -579,11 +577,11 @@ public class MQClientAPIImpl { return null; } - private void pullMessageAsync(// - final String addr, // 1 - final RemotingCommand request, // - final long timeoutMillis, // - final PullCallback pullCallback// + private void pullMessageAsync( + final String addr, + final RemotingCommand request, + final long timeoutMillis, + final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -611,10 +609,10 @@ public class MQClientAPIImpl { }); } - private PullResult pullMessageSync(// - final String addr, // 1 - final RemotingCommand request, // 2 - final long timeoutMillis// 3 + private PullResult pullMessageSync( + final String addr, + final RemotingCommand request, + final long timeoutMillis ) throws RemotingException, InterruptedException, MQBrokerException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -720,9 +718,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public List getConsumerIdListByGroup(// - final String addr, // - final String consumerGroup, // + public List getConsumerIdListByGroup( + final String addr, + final String consumerGroup, final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); @@ -796,10 +794,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long queryConsumerOffset(// - final String addr, // - final QueryConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public long queryConsumerOffset( + final String addr, + final QueryConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); @@ -820,10 +818,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffset(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public void updateConsumerOffset( + final String addr, + final UpdateConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); @@ -841,10 +839,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffsetOneway(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public void updateConsumerOffsetOneway( + final String addr, + final UpdateConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); @@ -852,10 +850,10 @@ public class MQClientAPIImpl { this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); } - public int sendHearbeat(// - final String addr, // - final HeartbeatData heartbeatData, // - final long timeoutMillis// + public int sendHearbeat( + final String addr, + final HeartbeatData heartbeatData, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); @@ -873,12 +871,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unregisterClient(// - final String addr, // - final String clientID, // - final String producerGroup, // - final String consumerGroup, // - final long timeoutMillis// + public void unregisterClient( + final String addr, + final String clientID, + final String producerGroup, + final String consumerGroup, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); requestHeader.setClientID(clientID); @@ -899,11 +897,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void endTransactionOneway(// - final String addr, // - final EndTransactionRequestHeader requestHeader, // - final String remark, // - final long timeoutMillis// + public void endTransactionOneway( + final String addr, + final EndTransactionRequestHeader requestHeader, + final String remark, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); @@ -965,9 +963,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public Set lockBatchMQ(// - final String addr, // - final LockBatchRequestBody requestBody, // + public Set lockBatchMQ( + final String addr, + final LockBatchRequestBody requestBody, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); @@ -987,11 +985,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unlockBatchMQ(// - final String addr, // - final UnlockBatchRequestBody requestBody, // - final long timeoutMillis, // - final boolean oneway// + public void unlockBatchMQ( + final String addr, + final UnlockBatchRequestBody requestBody, + final long timeoutMillis, + final boolean oneway ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); @@ -1213,7 +1211,7 @@ public class MQClientAPIImpl { if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); } - // TODO :- Log when if condition is not satisfied + break; } case ResponseCode.SUCCESS: { @@ -1566,12 +1564,12 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void registerMessageFilterClass(final String addr, // - final String consumerGroup, // - final String topic, // - final String className, // - final int classCRC, // - final byte[] classBody, // + public void registerMessageFilterClass(final String addr, + final String consumerGroup, + final String topic, + final String className, + final int classCRC, + final byte[] classBody, final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); @@ -1706,10 +1704,10 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, // - String consumerGroup, // - String clientId, // - String msgId, // + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, + String consumerGroup, + String clientId, + String msgId, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); @@ -1912,7 +1910,6 @@ public class MQClientAPIImpl { public Set getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - // todo:jodie return Collections.EMPTY_SET; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index f566ed0f..961e062b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -69,12 +69,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue(); - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); @@ -100,8 +100,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService @Override public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // + if (corePoolSize > 0 + && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } @@ -115,11 +115,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() // + 1); // } - // // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// + // {}", + // corePoolSize, + // this.consumeExecutor.getCorePoolSize(), // this.consumerGroup); } @@ -131,11 +130,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() // - 1); // } - // // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// + // {}", + // corePoolSize, + // this.consumeExecutor.getCorePoolSize(), // this.consumerGroup); } @@ -185,10 +183,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, // - msgs, // + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, mq), e); } @@ -200,10 +198,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } @Override - public void submitConsumeRequest(// - final List msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + public void submitConsumeRequest( + final List msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { @@ -258,10 +256,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } } - public void processConsumeResult(// - final ConsumeConcurrentlyStatus status, // - final ConsumeConcurrentlyContext context, // - final ConsumeRequest consumeRequest// + public void processConsumeResult( + final ConsumeConcurrentlyStatus status, + final ConsumeConcurrentlyContext context, + final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); @@ -338,10 +336,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService return false; } - private void submitConsumeRequestLater(// - final List msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue// + private void submitConsumeRequestLater( + final List msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue ) { this.scheduledExecutorService.schedule(new Runnable() { @@ -353,7 +351,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService }, 5000, TimeUnit.MILLISECONDS); } - private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// + private void submitConsumeRequestLater(final ConsumeRequest consumeRequest ) { this.scheduledExecutorService.schedule(new Runnable() { @@ -419,7 +417,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", - RemotingHelper.exceptionSimpleDesc(e), // + RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 1fa474ca..abdad79f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -70,12 +70,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue(); - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); @@ -107,8 +107,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { @Override public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // + if (corePoolSize > 0 + && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } @@ -171,10 +171,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, mq), e); } @@ -187,10 +187,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } @Override - public void submitConsumeRequest(// - final List msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + public void submitConsumeRequest( + final List msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); @@ -226,10 +226,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { return false; } - private void submitConsumeRequestLater(// - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final long suspendTimeMillis// + private void submitConsumeRequestLater( + final ProcessQueue processQueue, + final MessageQueue messageQueue, + final long suspendTimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { @@ -251,11 +251,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { }, timeMillis, TimeUnit.MILLISECONDS); } - public boolean processConsumeResult(// - final List msgs, // - final ConsumeOrderlyStatus status, // - final ConsumeOrderlyContext context, // - final ConsumeRequest consumeRequest// + public boolean processConsumeResult( + final List msgs, + final ConsumeOrderlyStatus status, + final ConsumeOrderlyContext context, + final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; @@ -273,9 +273,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { @@ -295,9 +295,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { break; case ROLLBACK: consumeRequest.getProcessQueue().rollback(); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; @@ -305,9 +305,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } @@ -468,22 +468,22 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } - if (null == status // - || ConsumeOrderlyStatus.ROLLBACK == status// + if (null == status + || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { - log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, messageQueue); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java index 8742191b..0f6f3bb3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -36,9 +36,9 @@ public interface ConsumeMessageService { ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); - void submitConsumeRequest(// - final List msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + void submitConsumeRequest( + final List msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispathToConsume); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 35ee16fe..8640d2d6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -97,8 +97,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// + throw new MQClientException("The consumer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -185,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { SubscriptionData subscriptionData; try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); @@ -193,18 +193,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.SYNC, // 10 - null// 11 + PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + 0L, + offset, + maxNums, + sysFlag, + 0, + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.SYNC, + null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { @@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void subscriptionAutomatically(final String topic) { if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); } catch (Exception ignore) { @@ -372,13 +372,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); } - private void pullAsyncImpl(// - final MessageQueue mq, // - final String subExpression, // - final long offset, // - final int maxNums, // - final PullCallback pullCallback, // - final boolean block, // + private void pullAsyncImpl( + final MessageQueue mq, + final String subExpression, + final long offset, + final int maxNums, + final PullCallback pullCallback, + final boolean block, final long timeout) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); @@ -405,7 +405,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { final SubscriptionData subscriptionData; try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); @@ -413,17 +413,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.ASYNC, // 10 + this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + 0L, + offset, + maxNums, + sysFlag, + 0, + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.ASYNC, new PullCallback() { @Override @@ -551,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); - this.pullAPIWrapper = new PullAPIWrapper(// - mQClientFactory, // + this.pullAPIWrapper = new PullAPIWrapper( + mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); @@ -589,8 +589,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The PullConsumer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -606,42 +606,42 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { // consumerGroup if (null == this.defaultMQPullConsumer.getConsumerGroup()) { throw new MQClientException( - "consumerGroup is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "consumerGroup is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // consumerGroup if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { throw new MQClientException( - "consumerGroup can not equal "// - + MixAll.DEFAULT_CONSUMER_GROUP // - + ", please specify another one."// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // messageModel if (null == this.defaultMQPullConsumer.getMessageModel()) { throw new MQClientException( - "messageModel is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // allocateMessageQueueStrategy if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) { throw new MQClientException( - "allocateMessageQueueStrategy is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // allocateMessageQueueStrategy if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) { throw new MQClientException( - "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } } @@ -651,7 +651,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { Set registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); if (registerTopics != null) { for (final String topic : registerTopics) { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 9bf34be8..7eda7c1b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -297,10 +297,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); - DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// - pullResult.getMsgFoundList(), // - processQueue, // - pullRequest.getMessageQueue(), // + DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( + pullResult.getMsgFoundList(), + processQueue, + pullRequest.getMessageQueue(), dispathToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { @@ -311,12 +311,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - if (pullResult.getNextBeginOffset() < prevRequestOffset// + if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( - "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // - pullResult.getNextBeginOffset(), // - firstMsgOffset, // + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", + pullResult.getNextBeginOffset(), + firstMsgOffset, prevRequestOffset); } @@ -336,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: - log.warn("the pull request offset illegal, {} {}", // + log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); @@ -396,26 +396,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { classFilter = sd.isClassFilterMode(); } - int sysFlag = PullSysFlag.buildSysFlag(// + int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { - this.pullAPIWrapper.pullKernelImpl(// - pullRequest.getMessageQueue(), // 1 - subExpression, // 2 - subscriptionData.getExpressionType(), // 3 - subscriptionData.getSubVersion(), // 4 - pullRequest.getNextOffset(), // 5 - this.defaultMQPushConsumer.getPullBatchSize(), // 6 - sysFlag, // 7 - commitOffsetValue, // 8 - BROKER_SUSPEND_MAX_TIME_MILLIS, // 9 - CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10 - CommunicationMode.ASYNC, // 11 - pullCallback // 12 + this.pullAPIWrapper.pullKernelImpl( + pullRequest.getMessageQueue(), + subExpression, + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion(), + pullRequest.getNextOffset(), + this.defaultMQPushConsumer.getPullBatchSize(), + sysFlag, + commitOffsetValue, + BROKER_SUSPEND_MAX_TIME_MILLIS, + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, + CommunicationMode.ASYNC, + pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); @@ -425,8 +425,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// + throw new MQClientException("The consumer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -608,8 +608,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -764,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { for (final Map.Entry entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } @@ -779,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; @@ -811,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String subExpression) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { @@ -824,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, "*"); subscriptionData.setSubString(fullClassName); subscriptionData.setClassFilterMode(true); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 634e0f0e..ef27ff84 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -73,9 +73,9 @@ public abstract class RebalanceImpl { try { this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); - log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", // - this.consumerGroup, // - this.mQClientFactory.getClientId(), // + log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", + this.consumerGroup, + this.mQClientFactory.getClientId(), mq); } catch (Exception e) { log.error("unlockBatchMQ exception, " + mq, e); @@ -245,10 +245,10 @@ public abstract class RebalanceImpl { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); - log.info("messageQueueChanged {} {} {} {}", // - consumerGroup, // - topic, // - mqSet, // + log.info("messageQueueChanged {} {} {} {}", + consumerGroup, + topic, + mqSet, mqSet); } } else { @@ -280,10 +280,10 @@ public abstract class RebalanceImpl { List allocateResult = null; try { - allocateResult = strategy.allocate(// - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mqAll, // + allocateResult = strategy.allocate( + this.consumerGroup, + this.mQClientFactory.getClientId(), + mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 112bceeb..2f4f745a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -74,8 +74,8 @@ public class RebalancePushImpl extends RebalanceImpl { pq.getLockConsume().unlock(); } } else { - log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // - mq, // + log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", + mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f146be9b..6ef594b0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -148,10 +148,10 @@ public class MQClientInstance { this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); - log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", // - this.instanceIndex, // - this.clientId, // - this.clientConfig, // + log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", + this.instanceIndex, + this.clientId, + this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } @@ -727,13 +727,13 @@ public class MQClientInstance { classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET); classCRC = UtilAll.crc32(classBody); } catch (Exception e1) { - log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", // - fullClassName, // + log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", + fullClassName, RemotingHelper.exceptionSimpleDesc(e1)); } TopicRouteData topicRouteData = this.topicRouteTable.get(topic); - if (topicRouteData != null // + if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { Iterator>> it = topicRouteData.getFilterServerTable().entrySet().iterator(); while (it.hasNext()) { @@ -1006,10 +1006,10 @@ public class MQClientInstance { return null; } - public FindBrokerResult findBrokerAddressInSubscribe(// - final String brokerName, // - final long brokerId, // - final boolean onlyThisBroker// + public FindBrokerResult findBrokerAddressInSubscribe( + final String brokerName, + final long brokerId, + final boolean onlyThisBroker ) { String brokerAddr = null; boolean slave = false; @@ -1102,7 +1102,6 @@ public class MQClientInstance { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { - // } Iterator iterator = processQueueTable.keySet().iterator(); @@ -1171,8 +1170,8 @@ public class MQClientInstance { return topicRouteTable; } - public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, // - final String consumerGroup, // + public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, + final String consumerGroup, final String brokerName) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 12f8a367..602fedd3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -116,11 +116,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; this.checkRequestQueue = new LinkedBlockingQueue(producer.getCheckRequestHoldMax()); - this.checkExecutor = new ThreadPoolExecutor(// - producer.getCheckThreadPoolMinSize(), // - producer.getCheckThreadPoolMaxSize(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // + this.checkExecutor = new ThreadPoolExecutor( + producer.getCheckThreadPoolMinSize(), + producer.getCheckThreadPoolMaxSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, this.checkRequestQueue); } @@ -172,8 +172,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The producer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The producer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -268,18 +268,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { exception = e; } - this.processTransactionState(// - localTransactionState, // - group, // + this.processTransactionState( + localTransactionState, + group, exception); } else { log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); } } - private void processTransactionState(// - final LocalTransactionState localTransactionState, // - final String producerGroup, // + private void processTransactionState( + final LocalTransactionState localTransactionState, + final String producerGroup, final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); @@ -354,8 +354,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The producer service state not OK, "// - + this.serviceState// + throw new MQClientException("The producer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -428,11 +428,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } - private SendResult sendDefaultImpl(// - Message msg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final long timeout// + private SendResult sendDefaultImpl( + Message msg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -579,11 +579,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - private SendResult sendKernelImpl(final Message msg, // - final MessageQueue mq, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // + private SendResult sendKernelImpl(final Message msg, + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { @@ -674,18 +674,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { SendResult sendResult = null; switch (communicationMode) { case ASYNC: - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// - brokerAddr, // 1 - mq.getBrokerName(), // 2 - msg, // 3 - requestHeader, // 4 - timeout, // 5 - communicationMode, // 6 - sendCallback, // 7 - topicPublishInfo, // 8 - this.mQClientFactory, // 9 - this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 - context, // + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout, + communicationMode, + sendCallback, + topicPublishInfo, + this.mQClientFactory, + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), + context, this); break; case ONEWAY: @@ -887,12 +887,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); } - private SendResult sendSelectImpl(// - Message msg, // - MessageQueueSelector selector, // - Object arg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, final long timeout// + private SendResult sendSelectImpl( + Message msg, + MessageQueueSelector selector, + Object arg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -1017,9 +1017,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } - public void endTransaction(// - final SendResult sendResult, // - final LocalTransactionState localTransactionState, // + public void endTransaction( + final SendResult sendResult, + final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index 5b2039ec..dfd485dd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -28,9 +28,9 @@ public interface MQProducerInner { TransactionCheckListener checkListener(); - void checkTransactionState(// - final String addr, // - final MessageExt msg, // + void checkTransactionState( + final String addr, + final MessageExt msg, final CheckTransactionStateRequestHeader checkRequestHeader); void updateTopicPublishInfo(final String topic, final TopicPublishInfo info); diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index b85f6f5b..4795cced 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -46,24 +46,14 @@ public class TopicConfig { public String encode() { StringBuilder sb = new StringBuilder(); - - // 1 sb.append(this.topicName); sb.append(SEPARATOR); - - // 2 sb.append(this.readQueueNums); sb.append(SEPARATOR); - - // 3 sb.append(this.writeQueueNums); sb.append(SEPARATOR); - - // 4 sb.append(this.perm); sb.append(SEPARATOR); - - // 5 sb.append(this.topicFilterType); return sb.toString(); diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java index eea0da1f..5d950beb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java +++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java @@ -18,44 +18,44 @@ package org.apache.rocketmq.common.help; public class FAQUrl { - public static final String APPLY_TOPIC_URL = // + public static final String APPLY_TOPIC_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = // + public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String GROUP_NAME_DUPLICATE_URL = // + public static final String GROUP_NAME_DUPLICATE_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String CLIENT_PARAMETER_CHECK_URL = // + public static final String CLIENT_PARAMETER_CHECK_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String SUBSCRIPTION_GROUP_NOT_EXIST = // + public static final String SUBSCRIPTION_GROUP_NOT_EXIST = "http://rocketmq.apache.org/docs/faq/"; - public static final String CLIENT_SERVICE_NOT_OK = // + public static final String CLIENT_SERVICE_NOT_OK = "http://rocketmq.apache.org/docs/faq/"; // FAQ: No route info of this topic, TopicABC - public static final String NO_TOPIC_ROUTE_INFO = // + public static final String NO_TOPIC_ROUTE_INFO = "http://rocketmq.apache.org/docs/faq/"; - public static final String LOAD_JSON_EXCEPTION = // + public static final String LOAD_JSON_EXCEPTION = "http://rocketmq.apache.org/docs/faq/"; - public static final String SAME_GROUP_DIFFERENT_TOPIC = // + public static final String SAME_GROUP_DIFFERENT_TOPIC = "http://rocketmq.apache.org/docs/faq/"; - public static final String MQLIST_NOT_EXIST = // + public static final String MQLIST_NOT_EXIST = "http://rocketmq.apache.org/docs/faq/"; - public static final String UNEXPECTED_EXCEPTION_URL = // + public static final String UNEXPECTED_EXCEPTION_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String SEND_MSG_FAILED = // + public static final String SEND_MSG_FAILED = "http://rocketmq.apache.org/docs/faq/"; - public static final String UNKNOWN_HOST_EXCEPTION = // + public static final String UNKNOWN_HOST_EXCEPTION = "http://rocketmq.apache.org/docs/faq/"; private static final String TIP_STRING_BEGIN = "\nSee "; diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java index a1d3ede2..d0b202ec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java @@ -42,7 +42,7 @@ public class MessageClientIDSetter { tempBuffer.put(createFakeIP()); } tempBuffer.position(6); - tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4 + tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java index 41e76fcb..d7942eb4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -130,15 +130,15 @@ public class ConsumerRunningInfo extends RemotingSerializable { if (orderMsg) { if (!pq.isLocked()) { - sb.append(String.format("%s %s can't lock for a while, %dms%n", // - clientId, // - mq, // + sb.append(String.format("%s %s can't lock for a while, %dms%n", + clientId, + mq, System.currentTimeMillis() - pq.getLastLockTimestamp())); } else { if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) { - sb.append(String.format("%s %s unlock %d times, still failed%n", // - clientId, // - mq, // + sb.append(String.format("%s %s unlock %d times, still failed%n", + clientId, + mq, pq.getTryUnlockTimes())); } } @@ -147,9 +147,9 @@ public class ConsumerRunningInfo extends RemotingSerializable { long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp(); if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) { - sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", // - clientId, // - mq, // + sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", + clientId, + mq, diff)); } } @@ -211,10 +211,10 @@ public class ConsumerRunningInfo extends RemotingSerializable { int i = 0; while (it.hasNext()) { SubscriptionData next = it.next(); - String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", // - ++i, // - next.getTopic(), // - next.isClassFilterMode(), // + String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", + ++i, + next.getTopic(), + next.isClassFilterMode(), next.getSubString()); sb.append(item); @@ -223,20 +223,20 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer Offset#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s%n", // - "#Topic", // - "#Broker Name", // - "#QID", // - "#Consumer Offset"// + sb.append(String.format("%-32s %-32s %-4s %-20s%n", + "#Topic", + "#Broker Name", + "#QID", + "#Consumer Offset" )); Iterator> it = this.mqTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); - String item = String.format("%-32s %-32s %-4d %-20d%n", // - next.getKey().getTopic(), // - next.getKey().getBrokerName(), // - next.getKey().getQueueId(), // + String item = String.format("%-32s %-32s %-4d %-20d%n", + next.getKey().getTopic(), + next.getKey().getBrokerName(), + next.getKey().getQueueId(), next.getValue().getCommitOffset()); sb.append(item); @@ -245,20 +245,20 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer MQ Detail#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s%n", // - "#Topic", // - "#Broker Name", // - "#QID", // - "#ProcessQueueInfo"// + sb.append(String.format("%-32s %-32s %-4s %-20s%n", + "#Topic", + "#Broker Name", + "#QID", + "#ProcessQueueInfo" )); Iterator> it = this.mqTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); - String item = String.format("%-32s %-32s %-4d %s%n", // - next.getKey().getTopic(), // - next.getKey().getBrokerName(), // - next.getKey().getQueueId(), // + String item = String.format("%-32s %-32s %-4d %s%n", + next.getKey().getTopic(), + next.getKey().getBrokerName(), + next.getKey().getQueueId(), next.getValue().toString()); sb.append(item); @@ -267,27 +267,27 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer RT&TPS#\n"); - sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", // - "#Topic", // - "#Pull RT", // - "#Pull TPS", // - "#Consume RT", // - "#ConsumeOK TPS", // - "#ConsumeFailed TPS", // - "#ConsumeFailedMsgsInHour"// + sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", + "#Topic", + "#Pull RT", + "#Pull TPS", + "#Consume RT", + "#ConsumeOK TPS", + "#ConsumeFailed TPS", + "#ConsumeFailedMsgsInHour" )); Iterator> it = this.statusTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); - String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", // - next.getKey(), // - next.getValue().getPullRT(), // - next.getValue().getPullTPS(), // - next.getValue().getConsumeRT(), // - next.getValue().getConsumeOKTPS(), // - next.getValue().getConsumeFailedTPS(), // - next.getValue().getConsumeFailedMsgs()// + String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", + next.getKey(), + next.getValue().getPullRT(), + next.getValue().getPullTPS(), + next.getValue().getConsumeRT(), + next.getValue().getConsumeOKTPS(), + next.getValue().getConsumeFailedTPS(), + next.getValue().getConsumeFailedMsgs() ); sb.append(item); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java index ba6b1293..6ba069e1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java @@ -27,8 +27,6 @@ public class GetConsumeStatsRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub - } public String getConsumerGroup() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java index 20990a69..ca26a869 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java @@ -32,7 +32,6 @@ public class GetConsumerStatusRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub } public String getTopic() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java index 222382e9..c64381fb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java @@ -32,7 +32,6 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub } public String getTopic() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java index 6a998d9f..93fa7227 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java @@ -33,7 +33,7 @@ public class QueryCorrectionOffsetHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } public String getFilterGroups() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 113e46f6..3685ef9d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -34,7 +34,7 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java index 082329c0..95e18d01 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java @@ -57,7 +57,7 @@ public class UnregisterClientRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java index 6ae69294..f61f0cd2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java @@ -24,7 +24,7 @@ public class UnregisterClientResponseHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java deleted file mode 100644 index 64081ead..00000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $ - */ -package org.apache.rocketmq.common.protocol.header.namesrv; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class GetRouteInfoResponseHeader implements CommandCustomHeader { - - @Override - public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub - - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java index 93069fed..8307e20b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java @@ -32,7 +32,7 @@ public class RegisterOrderTopicRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } public String getTopic() { diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java index 9966a90f..8fd86286 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java @@ -16,11 +16,7 @@ */ package org.apache.rocketmq.common.sysflag; -/** - * - * - */ public class TopicSysFlag { private final static int FLAG_UNIT = 0x1 << 0; diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java index 28ead5cd..e43ae410 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java @@ -114,9 +114,7 @@ public class IOTinyUtils { fileOrDir.delete(); } - /** - */ public static void cleanDirectory(File directory) throws IOException { if (!directory.exists()) { String message = directory + " does not exist"; diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index c8252d03..9bd9ea17 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -35,9 +35,7 @@ public class PushConsumer { consumer.setConsumeTimestamp("20170422221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { - /** - */ @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java index 23e4a79b..6cb5d158 100644 --- a/example/src/main/resources/MessageFilterImpl.java +++ b/example/src/main/resources/MessageFilterImpl.java @@ -28,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter { String property = msg.getProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); - if (((id % 10) == 0) && // + if (((id % 10) == 0) && (id > 100)) { return true; } diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java index 74e5501b..2948c106 100644 --- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java +++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java @@ -56,7 +56,6 @@ public class SelectorParser implements SelectorParserConstants { // convertStringExpressions = true; // sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length()); // } - // // if( convertStringExpressions ) { // ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true); // } diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj index 5d1a4a7d..b533ac17 100644 --- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj +++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj @@ -82,7 +82,6 @@ public class SelectorParser { // convertStringExpressions = true; // sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length()); // } -// // if( convertStringExpressions ) { // ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true); // } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java index be13bd6b..376a8143 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java @@ -72,10 +72,10 @@ public class KVConfigManager { final String prev = kvTable.put(key, value); if (null != prev) { - log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", // + log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } else { - log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", // + log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { @@ -119,7 +119,7 @@ public class KVConfigManager { HashMap kvTable = this.configTable.get(namespace); if (null != kvTable) { String value = kvTable.remove(key); - log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", // + log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 7479fcc5..35790c95 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -131,9 +131,9 @@ public class RouteInfoManager { String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); - if (null != topicConfigWrapper // + if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { - if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// + if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 4ed156d5..6e99b32f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); - private static final int FRAME_MAX_LENGTH = // + private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index ba74b532..b66e7de2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -188,7 +188,7 @@ public abstract class NettyRemotingAbstract { log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { - final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // + final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); @@ -210,9 +210,9 @@ public abstract class NettyRemotingAbstract { pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { - log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // - + ", too many requests and system thread pool busy, RejectedExecutionException " // - + pair.getObject2().toString() // + log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + + ", too many requests and system thread pool busy, RejectedExecutionException " + + pair.getObject2().toString() + " request code: " + cmd.getCode()); } @@ -422,10 +422,10 @@ public abstract class NettyRemotingAbstract { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = - String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreAsync.getQueueLength(), // - this.semaphoreAsync.availablePermits()// + String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", + timeoutMillis, + this.semaphoreAsync.getQueueLength(), + this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); @@ -459,10 +459,10 @@ public abstract class NettyRemotingAbstract { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( - "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreOneway.getQueueLength(), // - this.semaphoreOneway.availablePermits()// + "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", + timeoutMillis, + this.semaphoreOneway.getQueueLength(), + this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index db6a7e4b..ecf9ab2a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -92,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this(nettyClientConfig, null); } - public NettyRemotingClient(final NettyClientConfig nettyClientConfig, // + public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; @@ -130,8 +130,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void start() { - this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// - nettyClientConfig.getClientWorkerThreads(), // + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( + nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } }); - Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index c4354e92..0570c841 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -16,11 +16,7 @@ */ package org.apache.rocketmq.remoting.netty; -/** - * - * - */ public class NettyServerConfig implements Cloneable { private int listenPort = 8888; private int serverWorkerThreads = 8; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 52556fc0..2e0a81e8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -20,23 +20,23 @@ package org.apache.rocketmq.remoting.netty; public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; - public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = // + public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = "com.rocketmq.remoting.socket.sndbuf.size"; - public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = // + public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = "com.rocketmq.remoting.socket.rcvbuf.size"; - public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = // + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientAsyncSemaphoreValue"; - public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = // + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientOnewaySemaphoreValue"; - public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = Boolean .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); - public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // + public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); - public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = // + public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); - public static int socketSndbufSize = // + public static int socketSndbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); - public static int socketRcvbufSize = // + public static int socketRcvbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 0810d0ca..a2cb629e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -125,11 +125,11 @@ public class CommitLog { return this.mappedFileQueue.remainHowManyDataToFlush(); } - public int deleteExpiredFile(// - final long expiredTime, // - final int deleteFilesInterval, // - final long intervalForcibly, // - final boolean cleanImmediately// + public int deleteExpiredFile( + final long expiredTime, + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately ) { return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); } @@ -244,43 +244,30 @@ public class CommitLog { byte[] bytesContent = new byte[totalSize]; - // 3 BODYCRC int bodyCRC = byteBuffer.getInt(); - // 4 QUEUEID int queueId = byteBuffer.getInt(); - // 5 FLAG int flag = byteBuffer.getInt(); - // 6 QUEUEOFFSET long queueOffset = byteBuffer.getLong(); - // 7 PHYSICALOFFSET long physicOffset = byteBuffer.getLong(); - // 8 SYSFLAG int sysFlag = byteBuffer.getInt(); - // 9 BORNTIMESTAMP long bornTimeStamp = byteBuffer.getLong(); - // 10 ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8); - // 11 STORETIMESTAMP long storeTimestamp = byteBuffer.getLong(); - // 12 ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8); - // 13 RECONSUMETIMES int reconsumeTimes = byteBuffer.getInt(); - // 14 Prepared Transaction Offset long preparedTransactionOffset = byteBuffer.getLong(); - // 15 BODY int bodyLen = byteBuffer.getInt(); if (bodyLen > 0) { if (readBody) { @@ -298,7 +285,6 @@ public class CommitLog { } } - // 16 TOPIC byte topicLen = byteBuffer.get(); byteBuffer.get(bytesContent, 0, topicLen); String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8); @@ -307,7 +293,6 @@ public class CommitLog { String keys = ""; String uniqKey = null; - // 17 properties short propertiesLength = byteBuffer.getShort(); Map propertiesMap = null; if (propertiesLength > 0) { @@ -355,19 +340,19 @@ public class CommitLog { return new DispatchRequest(totalSize, false/* success */); } - return new DispatchRequest(// - topic, // 1 - queueId, // 2 - physicOffset, // 3 - totalSize, // 4 - tagsCode, // 5 - storeTimestamp, // 6 - queueOffset, // 7 - keys, // 8 - uniqKey, //9 - sysFlag, // 10 - preparedTransactionOffset, // 11 - propertiesMap // 12 + return new DispatchRequest( + topic, + queueId, + physicOffset, + totalSize, + tagsCode, + storeTimestamp, + queueOffset, + keys, + uniqKey, + sysFlag, + preparedTransactionOffset, + propertiesMap ); } catch (Exception e) { } @@ -376,24 +361,23 @@ public class CommitLog { } private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { - final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16 - // propertiesLength + final int msgLen = 4 //TOTALSIZE + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + 8 //BORNHOST + + 8 //STORETIMESTAMP + + 8 //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + + 1 + topicLength //TOPIC + + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength + 0; return msgLen; } @@ -500,18 +484,18 @@ public class CommitLog { return false; } - if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()// + if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { - log.info("find check timestamp, {} {}", // - storeTimestamp, // + log.info("find check timestamp, {} {}", + storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } else { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { - log.info("find check timestamp, {} {}", // - storeTimestamp, // + log.info("find check timestamp, {} {}", + storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } @@ -547,7 +531,7 @@ public class CommitLog { int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { @@ -1270,8 +1254,6 @@ public class CommitLog { // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value - // - // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); @@ -1391,7 +1373,6 @@ public class CommitLog { // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value - // //ignore previous read messagesByteBuff.reset(); // Here the length of the specially set maxBlank diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 49a1ebae..36c15d49 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -218,9 +218,7 @@ public class DefaultMessageStore implements MessageStore { this.shutdown = false; } - /** - */ public void shutdown() { if (!this.shutdown) { this.shutdown = true; @@ -392,7 +390,7 @@ public class DefaultMessageStore implements MessageStore { long begin = this.getCommitLog().getBeginTimeInLock(); long diff = this.systemClock.now() - begin; - if (diff < 10000000 // + if (diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) { return true; } @@ -579,9 +577,7 @@ public class DefaultMessageStore implements MessageStore { return getResult; } - /** - */ public long getMaxOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { @@ -592,9 +588,7 @@ public class DefaultMessageStore implements MessageStore { return 0; } - /** - */ public long getMinOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { @@ -891,9 +885,9 @@ public class DefaultMessageStore implements MessageStore { ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); - log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // - cq.getTopic(), // - cq.getQueueId() // + log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", + cq.getTopic(), + cq.getQueueId() ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); @@ -922,17 +916,17 @@ public class DefaultMessageStore implements MessageStore { long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); if (maxCLOffsetInConsumeQueue == -1) { - log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", // - nextQT.getValue().getTopic(), // - nextQT.getValue().getQueueId(), // - nextQT.getValue().getMaxPhysicOffset(), // + log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", + nextQT.getValue().getTopic(), + nextQT.getValue().getQueueId(), + nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()); } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( - "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", // - topic, // - nextQT.getKey(), // - minCommitLogOffset, // + "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", + topic, + nextQT.getKey(), + minCommitLogOffset, maxCLOffsetInConsumeQueue); DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), @@ -1072,11 +1066,11 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue logic = map.get(queueId); if (null == logic) { - ConsumeQueue newLogic = new ConsumeQueue(// - topic, // - queueId, // - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // + ConsumeQueue newLogic = new ConsumeQueue( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { @@ -1462,11 +1456,11 @@ public class DefaultMessageStore implements MessageStore { boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; - log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", // - fileReservedTime, // - timeup, // - spacefull, // - manualDeleteFileSeveralTimes, // + log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", + fileReservedTime, + timeup, + spacefull, + manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; @@ -1725,7 +1719,7 @@ public class DefaultMessageStore implements MessageStore { private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { - if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // + if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } @@ -1751,7 +1745,7 @@ public class DefaultMessageStore implements MessageStore { dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } - // FIXED BUG By shijia + this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index 3d33eaf3..819bb948 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -66,23 +66,14 @@ public class DispatchRequest { } public DispatchRequest(int size) { - // 1 this.topic = ""; - // 2 this.queueId = 0; - // 3 this.commitLogOffset = 0; - // 4 this.msgSize = size; - // 5 this.tagsCode = 0; - // 6 this.storeTimestamp = 0; - // 7 this.consumeQueueOffset = 0; - // 8 this.keys = ""; - //9 this.uniqKey = null; this.sysFlag = 0; this.preparedTransactionOffset = 0; @@ -91,23 +82,14 @@ public class DispatchRequest { } public DispatchRequest(int size, boolean success) { - // 1 this.topic = ""; - // 2 this.queueId = 0; - // 3 this.commitLogOffset = 0; - // 4 this.msgSize = size; - // 5 this.tagsCode = 0; - // 6 this.storeTimestamp = 0; - // 7 this.consumeQueueOffset = 0; - // 8 this.keys = ""; - // 9 this.uniqKey = null; this.sysFlag = 0; this.preparedTransactionOffset = 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 42504509..81cf0f78 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -404,9 +404,7 @@ public class MappedFile extends ReferenceResource { return null; } - /** - */ public SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index a8fa3648..edf4c918 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -405,7 +405,6 @@ public class MappedFileQueue { break; } - // TODO: Externalize this hardcoded value if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index 3967b643..e0c51a19 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -78,10 +78,7 @@ public class HAConnection { return socketChannel; } - /** - * - */ class ReadSocketService extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; private final Selector selector; @@ -194,10 +191,7 @@ public class HAConnection { } } - /** - * - */ class WriteSocketService extends ServiceThread { private final Selector selector; private final SocketChannel socketChannel; @@ -333,9 +327,7 @@ public class HAConnection { HAConnection.log.info(this.getServiceName() + " service end"); } - /** - */ private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index f507b368..6fc73357 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -85,9 +85,7 @@ public class HAService { return result; } - /** - */ public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); @@ -374,17 +372,6 @@ public class HAService { return !this.reportOffset.hasRemaining(); } - // private void reallocateByteBuffer() { - // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); - // int remain = this.byteBufferRead.limit() - this.dispatchPostion; - // bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain); - // this.dispatchPostion = 0; - // this.byteBufferRead = bb; - // } - - /** - - */ private void reallocateByteBuffer() { int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion; if (remain > 0) { @@ -426,7 +413,6 @@ public class HAService { break; } } else { - // TODO ERROR log.info("HAClient, processReadEvent read socket < 0"); return false; } @@ -598,8 +584,6 @@ public class HAService { log.info(this.getServiceName() + " service end"); } - - // // private void disableWriteFlag() { // if (this.socketChannel != null) { // SelectionKey sk = this.socketChannel.keyFor(this.selector); @@ -610,8 +594,6 @@ public class HAService { // } // } // } - // - // // private void enableWriteFlag() { // if (this.socketChannel != null) { // SelectionKey sk = this.socketChannel.keyFor(this.selector); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 54f57327..862e620e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -208,7 +208,6 @@ public class IndexFile { if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { - // TODO NOTFOUND } else { for (int nextIndexToRead = slotValue;;) { if (phyOffsets.size() >= maxNum) { diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 5102a216..3195448e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -20,11 +20,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -/** - * - * - */ public class IndexHeader { public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 25640a46..35b8e856 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -336,10 +336,7 @@ public class ScheduleMessageService extends ConfigManager { } } // end of if (bufferCQ != null) else { - /* - - */ long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index 5555b8b6..64b4097e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -38,9 +38,7 @@ public class BrokerStats { this.defaultMessageStore = defaultMessageStore; } - /** - */ public void record() { this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning; this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 157ae21e..12aea8ac 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -119,7 +119,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() - + "] has created already, specifed another name please."// + + "] has created already, specifed another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } @@ -132,8 +132,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The AdminExt service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The AdminExt service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; @@ -185,13 +185,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { @Override public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { - // TODO Auto-generated method stub return null; } @Override public TopicConfig examineTopicConfig(String addr, String topic) { - // TODO Auto-generated method stub return null; } @@ -344,8 +342,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { @Override public void putKVConfig(String namespace, String key, String value) { - // TODO Auto-generated method stub - } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index ac485199..d46d6fb6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -51,7 +51,6 @@ public class PrintMessageSubCommand implements SubCommand { System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(), printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY"); } catch (UnsupportedEncodingException e) { - // } } } @@ -108,10 +107,10 @@ public class PrintMessageSubCommand implements SubCommand { try { String topic = commandLine.getOptionValue('t').trim(); - String charsetName = // + String charsetName = !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); - String subExpression = // + String subExpression = !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); boolean printBody = !commandLine.hasOption('d') || Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); -- GitLab