提交 7f96008c 编写于 作者: Y yukon

Remove unused class GetRouteInfoResponseHeader and meaningless comments

上级 ffad6566
......@@ -135,11 +135,11 @@ public class BrokerController {
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
public BrokerController(//
final BrokerConfig brokerConfig, //
final NettyServerConfig nettyServerConfig, //
final NettyClientConfig nettyClientConfig, //
final MessageStoreConfig messageStoreConfig //
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
......@@ -255,7 +255,6 @@ public class BrokerController {
this.registerProcessor();
// TODO remove in future
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
......
......@@ -190,10 +190,10 @@ public class BrokerStartup {
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
final BrokerController controller = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
nettyClientConfig, //
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
......
......@@ -52,9 +52,9 @@ public class RebalanceLockManager {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
group, //
clientId, //
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
......@@ -69,19 +69,19 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return true;
}
log.warn(
"tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return false;
} finally {
......@@ -144,9 +144,9 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
group, //
clientId, //
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
......@@ -162,20 +162,20 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
lockedMqs.add(mq);
continue;
}
log.warn(
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
}
} finally {
......
......@@ -111,9 +111,7 @@ public class FilterServerManager {
}
}
/**
*/
public void scanNotActiveChannel() {
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
......
......@@ -189,10 +189,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
requestHeader.getTopic(), //
requestHeader.getDefaultTopic(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
......
......@@ -116,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......@@ -432,9 +433,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(//
requestBody.getConsumerGroup(), //
requestBody.getMqSet(), //
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
LockBatchResponseBody responseBody = new LockBatchResponseBody();
......@@ -450,9 +451,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
this.brokerController.getRebalanceLockManager().unlockBatch(//
requestBody.getConsumerGroup(), //
requestBody.getMqSet(), //
this.brokerController.getRebalanceLockManager().unlockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
response.setCode(ResponseCode.SUCCESS);
......@@ -657,14 +658,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
/**
*/
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
if (null == findSubscriptionData //
if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
continue;
......@@ -683,9 +682,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
requestHeader.getConsumerGroup(), //
topic, //
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(),
topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
......@@ -925,9 +924,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
/**
*/
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
......@@ -1007,9 +1004,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
/**
*/
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
......@@ -1107,13 +1102,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (isOrder && !topicConfig.isOrder()) {
continue;
}
/**
*/
{
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (null == findSubscriptionData //
if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);
continue;
......@@ -1129,9 +1122,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
group, //
topic, //
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
group,
topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
......@@ -1215,10 +1208,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return runtimeInfo;
}
private RemotingCommand callConsumer(//
final int requestCode, //
final RemotingCommand request, //
final String consumerGroup, //
private RemotingCommand callConsumer(
final int requestCode,
final RemotingCommand request,
final String consumerGroup,
final String clientId) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
......@@ -1231,8 +1224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
clientId, //
response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",
clientId,
MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
return response;
}
......
......@@ -160,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
assert consumerFilterData != null;
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
......@@ -176,7 +176,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return response;
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
......@@ -285,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
requestHeader.getQueueId(), //
requestHeader.getConsumerGroup()//
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
......
......@@ -139,9 +139,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
newTopic, //
subscriptionGroupConfig.getRetryQueueNums(), //
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
......@@ -175,13 +175,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
DLQ_NUMS_PER_GROUP, //
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
......@@ -268,8 +268,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
DLQ_NUMS_PER_GROUP, //
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
msg.setTopic(newTopic);
......@@ -289,9 +289,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return true;
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
......@@ -464,9 +464,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
return response;
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
......
......@@ -38,10 +38,10 @@ public class BrokerControllerTest {
@Test
public void testBrokerRestart() throws Exception {
for (int i = 0; i < 2; i++) {
BrokerController brokerController = new BrokerController(//
new BrokerConfig(), //
new NettyServerConfig(), //
new NettyClientConfig(), //
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
......
......@@ -52,9 +52,9 @@ public class LocalFileOffsetStore implements OffsetStore {
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
this.mQClientFactory.getClientId() + File.separator + //
this.groupName + File.separator + //
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
}
......@@ -217,8 +217,8 @@ public class LocalFileOffsetStore implements OffsetStore {
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception", e);
throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
+ FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low"
+ FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION),
e);
}
return offsetSerializeWrapper;
......
......@@ -204,7 +204,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
......@@ -232,7 +232,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
......
......@@ -159,7 +159,7 @@ public class MQAdminImpl {
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
......
......@@ -285,32 +285,32 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public SendResult sendMessage(//
final String addr, // 1
final String brokerName, // 2
final Message msg, // 3
final SendMessageRequestHeader requestHeader, // 4
final long timeoutMillis, // 5
final CommunicationMode communicationMode, // 6
final SendMessageContext context, // 7
final DefaultMQProducerImpl producer // 8
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
public SendResult sendMessage(//
final String addr, // 1
final String brokerName, // 2
final Message msg, // 3
final SendMessageRequestHeader requestHeader, // 4
final long timeoutMillis, // 5
final CommunicationMode communicationMode, // 6
final SendCallback sendCallback, // 7
final TopicPublishInfo topicPublishInfo, // 8
final MQClientInstance instance, // 9
final int retryTimesWhenSendFailed, // 10
final SendMessageContext context, // 11
final DefaultMQProducerImpl producer // 12
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
......@@ -341,31 +341,31 @@ public class MQClientAPIImpl {
return null;
}
private SendResult sendMessageSync(//
final String addr, //
final String brokerName, //
final Message msg, //
final long timeoutMillis, //
final RemotingCommand request//
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
private void sendMessageAsync(//
final String addr, //
final String brokerName, //
final Message msg, //
final long timeoutMillis, //
final RemotingCommand request, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
final MQClientInstance instance, //
final int retryTimesWhenSendFailed, //
final AtomicInteger times, //
final SendMessageContext context, //
final DefaultMQProducerImpl producer //
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
......@@ -380,7 +380,6 @@ public class MQClientAPIImpl {
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
//
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
......@@ -428,19 +427,19 @@ public class MQClientAPIImpl {
});
}
private void onExceptionImpl(final String brokerName, //
final Message msg, //
final long timeoutMillis, //
final RemotingCommand request, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
final MQClientInstance instance, //
final int timesTotal, //
final AtomicInteger curTimes, //
final Exception e, //
final SendMessageContext context, //
final boolean needRetry, //
final DefaultMQProducerImpl producer // 12
private void onExceptionImpl(final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int timesTotal,
final AtomicInteger curTimes,
final Exception e,
final SendMessageContext context,
final boolean needRetry,
final DefaultMQProducerImpl producer
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
......@@ -485,16 +484,15 @@ public class MQClientAPIImpl {
}
}
private SendResult processSendResponse(//
final String brokerName, //
final Message msg, //
final RemotingCommand response//
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.SLAVE_NOT_AVAILABLE: {
// TODO LOG
}
case ResponseCode.SUCCESS: {
SendStatus sendStatus = SendStatus.SEND_OK;
......@@ -553,12 +551,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public PullResult pullMessage(//
final String addr, //
final PullMessageRequestHeader requestHeader, //
final long timeoutMillis, //
final CommunicationMode communicationMode, //
final PullCallback pullCallback//
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
......@@ -579,11 +577,11 @@ public class MQClientAPIImpl {
return null;
}
private void pullMessageAsync(//
final String addr, // 1
final RemotingCommand request, //
final long timeoutMillis, //
final PullCallback pullCallback//
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
......@@ -611,10 +609,10 @@ public class MQClientAPIImpl {
});
}
private PullResult pullMessageSync(//
final String addr, // 1
final RemotingCommand request, // 2
final long timeoutMillis// 3
private PullResult pullMessageSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
......@@ -720,9 +718,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public List<String> getConsumerIdListByGroup(//
final String addr, //
final String consumerGroup, //
public List<String> getConsumerIdListByGroup(
final String addr,
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
......@@ -796,10 +794,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public long queryConsumerOffset(//
final String addr, //
final QueryConsumerOffsetRequestHeader requestHeader, //
final long timeoutMillis//
public long queryConsumerOffset(
final String addr,
final QueryConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
......@@ -820,10 +818,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void updateConsumerOffset(//
final String addr, //
final UpdateConsumerOffsetRequestHeader requestHeader, //
final long timeoutMillis//
public void updateConsumerOffset(
final String addr,
final UpdateConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
......@@ -841,10 +839,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void updateConsumerOffsetOneway(//
final String addr, //
final UpdateConsumerOffsetRequestHeader requestHeader, //
final long timeoutMillis//
public void updateConsumerOffsetOneway(
final String addr,
final UpdateConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
......@@ -852,10 +850,10 @@ public class MQClientAPIImpl {
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
public int sendHearbeat(//
final String addr, //
final HeartbeatData heartbeatData, //
final long timeoutMillis//
public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
......@@ -873,12 +871,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void unregisterClient(//
final String addr, //
final String clientID, //
final String producerGroup, //
final String consumerGroup, //
final long timeoutMillis//
public void unregisterClient(
final String addr,
final String clientID,
final String producerGroup,
final String consumerGroup,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(clientID);
......@@ -899,11 +897,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void endTransactionOneway(//
final String addr, //
final EndTransactionRequestHeader requestHeader, //
final String remark, //
final long timeoutMillis//
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
......@@ -965,9 +963,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public Set<MessageQueue> lockBatchMQ(//
final String addr, //
final LockBatchRequestBody requestBody, //
public Set<MessageQueue> lockBatchMQ(
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
......@@ -987,11 +985,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void unlockBatchMQ(//
final String addr, //
final UnlockBatchRequestBody requestBody, //
final long timeoutMillis, //
final boolean oneway//
public void unlockBatchMQ(
final String addr,
final UnlockBatchRequestBody requestBody,
final long timeoutMillis,
final boolean oneway
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
......@@ -1213,7 +1211,7 @@ public class MQClientAPIImpl {
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
// TODO :- Log when if condition is not satisfied
break;
}
case ResponseCode.SUCCESS: {
......@@ -1566,12 +1564,12 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public void registerMessageFilterClass(final String addr, //
final String consumerGroup, //
final String topic, //
final String className, //
final int classCRC, //
final byte[] classBody, //
public void registerMessageFilterClass(final String addr,
final String consumerGroup,
final String topic,
final String className,
final int classCRC,
final byte[] classBody,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
......@@ -1706,10 +1704,10 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
String consumerGroup, //
String clientId, //
String msgId, //
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr,
String consumerGroup,
String clientId,
String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
......@@ -1912,7 +1910,6 @@ public class MQClientAPIImpl {
public Set<String> getClusterList(String topic,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// todo:jodie
return Collections.EMPTY_SET;
}
......
......@@ -69,12 +69,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(//
this.defaultMQPushConsumer.getConsumeThreadMin(), //
this.defaultMQPushConsumer.getConsumeThreadMax(), //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.consumeRequestQueue, //
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
......@@ -100,8 +100,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0 //
&& corePoolSize <= Short.MAX_VALUE //
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
......@@ -115,11 +115,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// + 1);
// }
//
// log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
// {}", //
// corePoolSize,//
// this.consumeExecutor.getCorePoolSize(),//
// {}",
// corePoolSize,
// this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
}
......@@ -131,11 +130,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// - 1);
// }
//
// log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
// {}", //
// corePoolSize,//
// this.consumeExecutor.getCorePoolSize(),//
// {}",
// corePoolSize,
// this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
}
......@@ -185,10 +183,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageConcurrentlyService.this.consumerGroup, //
msgs, //
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
mq), e);
}
......@@ -200,10 +198,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
......@@ -258,10 +256,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
public void processConsumeResult(//
final ConsumeConcurrentlyStatus status, //
final ConsumeConcurrentlyContext context, //
final ConsumeRequest consumeRequest//
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
......@@ -338,10 +336,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
return false;
}
private void submitConsumeRequestLater(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue//
private void submitConsumeRequestLater(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {
this.scheduledExecutorService.schedule(new Runnable() {
......@@ -353,7 +351,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}, 5000, TimeUnit.MILLISECONDS);
}
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
) {
this.scheduledExecutorService.schedule(new Runnable() {
......@@ -419,7 +417,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e), //
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
......
......@@ -70,12 +70,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(//
this.defaultMQPushConsumer.getConsumeThreadMin(), //
this.defaultMQPushConsumer.getConsumeThreadMax(), //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.consumeRequestQueue, //
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
......@@ -107,8 +107,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0 //
&& corePoolSize <= Short.MAX_VALUE //
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
......@@ -171,10 +171,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
mq), e);
}
......@@ -187,10 +187,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
@Override
public void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
......@@ -226,10 +226,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
return false;
}
private void submitConsumeRequestLater(//
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
final long suspendTimeMillis//
private void submitConsumeRequestLater(
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final long suspendTimeMillis
) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
......@@ -251,11 +251,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}, timeMillis, TimeUnit.MILLISECONDS);
}
public boolean processConsumeResult(//
final List<MessageExt> msgs, //
final ConsumeOrderlyStatus status, //
final ConsumeOrderlyContext context, //
final ConsumeRequest consumeRequest//
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
......@@ -273,9 +273,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
......@@ -295,9 +295,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
......@@ -305,9 +305,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
......@@ -468,22 +468,22 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status //
|| ConsumeOrderlyStatus.ROLLBACK == status//
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
......
......@@ -36,9 +36,9 @@ public interface ConsumeMessageService {
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
void submitConsumeRequest(//
final List<MessageExt> msgs, //
final ProcessQueue processQueue, //
final MessageQueue messageQueue, //
void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}
......@@ -97,8 +97,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "//
+ this.serviceState//
throw new MQClientException("The consumer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
......@@ -185,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
......@@ -193,18 +193,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.SYNC, // 10
null// 11
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
......@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception ignore) {
......@@ -372,13 +372,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
}
private void pullAsyncImpl(//
final MessageQueue mq, //
final String subExpression, //
final long offset, //
final int maxNums, //
final PullCallback pullCallback, //
final boolean block, //
private void pullAsyncImpl(
final MessageQueue mq,
final String subExpression,
final long offset,
final int maxNums,
final PullCallback pullCallback,
final boolean block,
final long timeout) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
......@@ -405,7 +405,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
......@@ -413,17 +413,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.ASYNC, // 10
this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.ASYNC,
new PullCallback() {
@Override
......@@ -551,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(//
mQClientFactory, //
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
......@@ -589,8 +589,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
+ this.serviceState//
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
......@@ -606,42 +606,42 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
// consumerGroup
if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
throw new MQClientException(
"consumerGroup is null" //
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
"consumerGroup is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumerGroup
if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
"consumerGroup can not equal "//
+ MixAll.DEFAULT_CONSUMER_GROUP //
+ ", please specify another one."//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
"consumerGroup can not equal "
+ MixAll.DEFAULT_CONSUMER_GROUP
+ ", please specify another one."
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// messageModel
if (null == this.defaultMQPullConsumer.getMessageModel()) {
throw new MQClientException(
"messageModel is null" //
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
"messageModel is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// allocateMessageQueueStrategy
if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
"allocateMessageQueueStrategy is null" //
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
"allocateMessageQueueStrategy is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// allocateMessageQueueStrategy
if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
......@@ -651,7 +651,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
......
......@@ -297,10 +297,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
......@@ -311,12 +311,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset//
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
pullResult.getNextBeginOffset(), //
firstMsgOffset, //
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
......@@ -336,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}", //
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
......@@ -396,26 +396,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(//
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getExpressionType(), // 3
subscriptionData.getSubVersion(), // 4
pullRequest.getNextOffset(), // 5
this.defaultMQPushConsumer.getPullBatchSize(), // 6
sysFlag, // 7
commitOffsetValue, // 8
BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
CommunicationMode.ASYNC, // 11
pullCallback // 12
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
......@@ -425,8 +425,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "//
+ this.serviceState//
throw new MQClientException("The consumer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
......@@ -608,8 +608,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
......@@ -764,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
......@@ -779,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
......@@ -811,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
......@@ -824,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
......
......@@ -73,9 +73,9 @@ public abstract class RebalanceImpl {
try {
this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
this.consumerGroup, //
this.mQClientFactory.getClientId(), //
log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}",
this.consumerGroup,
this.mQClientFactory.getClientId(),
mq);
} catch (Exception e) {
log.error("unlockBatchMQ exception, " + mq, e);
......@@ -245,10 +245,10 @@ public abstract class RebalanceImpl {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", //
consumerGroup, //
topic, //
mqSet, //
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
......@@ -280,10 +280,10 @@ public abstract class RebalanceImpl {
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(//
this.consumerGroup, //
this.mQClientFactory.getClientId(), //
mqAll, //
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
......
......@@ -74,8 +74,8 @@ public class RebalancePushImpl extends RebalanceImpl {
pq.getLockConsume().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
mq, //
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
......
......@@ -148,10 +148,10 @@ public class MQClientInstance {
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
......@@ -727,13 +727,13 @@ public class MQClientInstance {
classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
classCRC = UtilAll.crc32(classBody);
} catch (Exception e1) {
log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", //
fullClassName, //
log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
fullClassName,
RemotingHelper.exceptionSimpleDesc(e1));
}
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null //
if (topicRouteData != null
&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
while (it.hasNext()) {
......@@ -1006,10 +1006,10 @@ public class MQClientInstance {
return null;
}
public FindBrokerResult findBrokerAddressInSubscribe(//
final String brokerName, //
final long brokerId, //
final boolean onlyThisBroker//
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
String brokerAddr = null;
boolean slave = false;
......@@ -1102,7 +1102,6 @@ public class MQClientInstance {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
//
}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
......@@ -1171,8 +1170,8 @@ public class MQClientInstance {
return topicRouteTable;
}
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
final String consumerGroup, //
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
final String consumerGroup,
final String brokerName) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
......
......@@ -116,11 +116,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(//
producer.getCheckThreadPoolMinSize(), //
producer.getCheckThreadPoolMaxSize(), //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
......@@ -172,8 +172,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
......@@ -268,18 +268,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
exception = e;
}
this.processTransactionState(//
localTransactionState, //
group, //
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
private void processTransactionState(//
final LocalTransactionState localTransactionState, //
final String producerGroup, //
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
......@@ -354,8 +354,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "//
+ this.serviceState//
throw new MQClientException("The producer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
......@@ -428,11 +428,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -579,11 +579,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
private SendResult sendKernelImpl(final Message msg, //
final MessageQueue mq, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
......@@ -674,18 +674,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, // 9
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
context, //
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
......@@ -887,12 +887,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendSelectImpl(//
Message msg, //
MessageQueueSelector selector, //
Object arg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, final long timeout//
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -1017,9 +1017,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public void endTransaction(//
final SendResult sendResult, //
final LocalTransactionState localTransactionState, //
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
......
......@@ -28,9 +28,9 @@ public interface MQProducerInner {
TransactionCheckListener checkListener();
void checkTransactionState(//
final String addr, //
final MessageExt msg, //
void checkTransactionState(
final String addr,
final MessageExt msg,
final CheckTransactionStateRequestHeader checkRequestHeader);
void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
......
......@@ -46,24 +46,14 @@ public class TopicConfig {
public String encode() {
StringBuilder sb = new StringBuilder();
// 1
sb.append(this.topicName);
sb.append(SEPARATOR);
// 2
sb.append(this.readQueueNums);
sb.append(SEPARATOR);
// 3
sb.append(this.writeQueueNums);
sb.append(SEPARATOR);
// 4
sb.append(this.perm);
sb.append(SEPARATOR);
// 5
sb.append(this.topicFilterType);
return sb.toString();
......
......@@ -18,44 +18,44 @@ package org.apache.rocketmq.common.help;
public class FAQUrl {
public static final String APPLY_TOPIC_URL = //
public static final String APPLY_TOPIC_URL =
"http://rocketmq.apache.org/docs/faq/";
public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = //
public static final String NAME_SERVER_ADDR_NOT_EXIST_URL =
"http://rocketmq.apache.org/docs/faq/";
public static final String GROUP_NAME_DUPLICATE_URL = //
public static final String GROUP_NAME_DUPLICATE_URL =
"http://rocketmq.apache.org/docs/faq/";
public static final String CLIENT_PARAMETER_CHECK_URL = //
public static final String CLIENT_PARAMETER_CHECK_URL =
"http://rocketmq.apache.org/docs/faq/";
public static final String SUBSCRIPTION_GROUP_NOT_EXIST = //
public static final String SUBSCRIPTION_GROUP_NOT_EXIST =
"http://rocketmq.apache.org/docs/faq/";
public static final String CLIENT_SERVICE_NOT_OK = //
public static final String CLIENT_SERVICE_NOT_OK =
"http://rocketmq.apache.org/docs/faq/";
// FAQ: No route info of this topic, TopicABC
public static final String NO_TOPIC_ROUTE_INFO = //
public static final String NO_TOPIC_ROUTE_INFO =
"http://rocketmq.apache.org/docs/faq/";
public static final String LOAD_JSON_EXCEPTION = //
public static final String LOAD_JSON_EXCEPTION =
"http://rocketmq.apache.org/docs/faq/";
public static final String SAME_GROUP_DIFFERENT_TOPIC = //
public static final String SAME_GROUP_DIFFERENT_TOPIC =
"http://rocketmq.apache.org/docs/faq/";
public static final String MQLIST_NOT_EXIST = //
public static final String MQLIST_NOT_EXIST =
"http://rocketmq.apache.org/docs/faq/";
public static final String UNEXPECTED_EXCEPTION_URL = //
public static final String UNEXPECTED_EXCEPTION_URL =
"http://rocketmq.apache.org/docs/faq/";
public static final String SEND_MSG_FAILED = //
public static final String SEND_MSG_FAILED =
"http://rocketmq.apache.org/docs/faq/";
public static final String UNKNOWN_HOST_EXCEPTION = //
public static final String UNKNOWN_HOST_EXCEPTION =
"http://rocketmq.apache.org/docs/faq/";
private static final String TIP_STRING_BEGIN = "\nSee ";
......
......@@ -42,7 +42,7 @@ public class MessageClientIDSetter {
tempBuffer.put(createFakeIP());
}
tempBuffer.position(6);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
......
......@@ -130,15 +130,15 @@ public class ConsumerRunningInfo extends RemotingSerializable {
if (orderMsg) {
if (!pq.isLocked()) {
sb.append(String.format("%s %s can't lock for a while, %dms%n", //
clientId, //
mq, //
sb.append(String.format("%s %s can't lock for a while, %dms%n",
clientId,
mq,
System.currentTimeMillis() - pq.getLastLockTimestamp()));
} else {
if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
sb.append(String.format("%s %s unlock %d times, still failed%n", //
clientId, //
mq, //
sb.append(String.format("%s %s unlock %d times, still failed%n",
clientId,
mq,
pq.getTryUnlockTimes()));
}
}
......@@ -147,9 +147,9 @@ public class ConsumerRunningInfo extends RemotingSerializable {
long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", //
clientId, //
mq, //
sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n",
clientId,
mq,
diff));
}
}
......@@ -211,10 +211,10 @@ public class ConsumerRunningInfo extends RemotingSerializable {
int i = 0;
while (it.hasNext()) {
SubscriptionData next = it.next();
String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", //
++i, //
next.getTopic(), //
next.isClassFilterMode(), //
String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n",
++i,
next.getTopic(),
next.isClassFilterMode(),
next.getSubString());
sb.append(item);
......@@ -223,20 +223,20 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer Offset#\n");
sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
"#Topic", //
"#Broker Name", //
"#QID", //
"#Consumer Offset"//
sb.append(String.format("%-32s %-32s %-4s %-20s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Consumer Offset"
));
Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueueInfo> next = it.next();
String item = String.format("%-32s %-32s %-4d %-20d%n", //
next.getKey().getTopic(), //
next.getKey().getBrokerName(), //
next.getKey().getQueueId(), //
String item = String.format("%-32s %-32s %-4d %-20d%n",
next.getKey().getTopic(),
next.getKey().getBrokerName(),
next.getKey().getQueueId(),
next.getValue().getCommitOffset());
sb.append(item);
......@@ -245,20 +245,20 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer MQ Detail#\n");
sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
"#Topic", //
"#Broker Name", //
"#QID", //
"#ProcessQueueInfo"//
sb.append(String.format("%-32s %-32s %-4s %-20s%n",
"#Topic",
"#Broker Name",
"#QID",
"#ProcessQueueInfo"
));
Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueueInfo> next = it.next();
String item = String.format("%-32s %-32s %-4d %s%n", //
next.getKey().getTopic(), //
next.getKey().getBrokerName(), //
next.getKey().getQueueId(), //
String item = String.format("%-32s %-32s %-4d %s%n",
next.getKey().getTopic(),
next.getKey().getBrokerName(),
next.getKey().getQueueId(),
next.getValue().toString());
sb.append(item);
......@@ -267,27 +267,27 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer RT&TPS#\n");
sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", //
"#Topic", //
"#Pull RT", //
"#Pull TPS", //
"#Consume RT", //
"#ConsumeOK TPS", //
"#ConsumeFailed TPS", //
"#ConsumeFailedMsgsInHour"//
sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n",
"#Topic",
"#Pull RT",
"#Pull TPS",
"#Consume RT",
"#ConsumeOK TPS",
"#ConsumeFailed TPS",
"#ConsumeFailedMsgsInHour"
));
Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumeStatus> next = it.next();
String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", //
next.getKey(), //
next.getValue().getPullRT(), //
next.getValue().getPullTPS(), //
next.getValue().getConsumeRT(), //
next.getValue().getConsumeOKTPS(), //
next.getValue().getConsumeFailedTPS(), //
next.getValue().getConsumeFailedMsgs()//
String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n",
next.getKey(),
next.getValue().getPullRT(),
next.getValue().getPullTPS(),
next.getValue().getConsumeRT(),
next.getValue().getConsumeOKTPS(),
next.getValue().getConsumeFailedTPS(),
next.getValue().getConsumeFailedMsgs()
);
sb.append(item);
......
......@@ -27,8 +27,6 @@ public class GetConsumeStatsRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
public String getConsumerGroup() {
......
......@@ -32,7 +32,6 @@ public class GetConsumerStatusRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
public String getTopic() {
......
......@@ -32,7 +32,6 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
public String getTopic() {
......
......@@ -33,7 +33,7 @@ public class QueryCorrectionOffsetHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
public String getFilterGroups() {
......
......@@ -34,7 +34,7 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
......
......@@ -57,7 +57,7 @@ public class UnregisterClientRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
}
......@@ -24,7 +24,7 @@ public class UnregisterClientResponseHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetRouteInfoResponseHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
}
......@@ -32,7 +32,7 @@ public class RegisterOrderTopicRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
}
public String getTopic() {
......
......@@ -16,11 +16,7 @@
*/
package org.apache.rocketmq.common.sysflag;
/**
*
*
*/
public class TopicSysFlag {
private final static int FLAG_UNIT = 0x1 << 0;
......
......@@ -114,9 +114,7 @@ public class IOTinyUtils {
fileOrDir.delete();
}
/**
*/
public static void cleanDirectory(File directory) throws IOException {
if (!directory.exists()) {
String message = directory + " does not exist";
......
......@@ -35,9 +35,7 @@ public class PushConsumer {
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
......
......@@ -28,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 10) == 0) && //
if (((id % 10) == 0) &&
(id > 100)) {
return true;
}
......
......@@ -56,7 +56,6 @@ public class SelectorParser implements SelectorParserConstants {
// convertStringExpressions = true;
// sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length());
// }
//
// if( convertStringExpressions ) {
// ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true);
// }
......
......@@ -82,7 +82,6 @@ public class SelectorParser {
// convertStringExpressions = true;
// sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length());
// }
//
// if( convertStringExpressions ) {
// ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true);
// }
......
......@@ -72,10 +72,10 @@ public class KVConfigManager {
final String prev = kvTable.put(key, value);
if (null != prev) {
log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", //
log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
} else {
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", //
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
......@@ -119,7 +119,7 @@ public class KVConfigManager {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
String value = kvTable.remove(key);
log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", //
log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
......
......@@ -131,9 +131,9 @@ public class RouteInfoManager {
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper //
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
......
......@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH = //
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
......
......@@ -188,7 +188,7 @@ public abstract class NettyRemotingAbstract {
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
......@@ -210,9 +210,9 @@ public abstract class NettyRemotingAbstract {
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
......@@ -422,10 +422,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreAsync.getQueueLength(), //
this.semaphoreAsync.availablePermits()//
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
......@@ -459,10 +459,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreOneway.getQueueLength(), //
this.semaphoreOneway.availablePermits()//
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
......
......@@ -92,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
......@@ -130,8 +130,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyClientConfig.getClientWorkerThreads(), //
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
......@@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
......
......@@ -16,11 +16,7 @@
*/
package org.apache.rocketmq.remoting.netty;
/**
*
*
*/
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
......
......@@ -20,23 +20,23 @@ package org.apache.rocketmq.remoting.netty;
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
"com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = //
public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE =
"com.rocketmq.remoting.socket.sndbuf.size";
public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = //
public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE =
"com.rocketmq.remoting.socket.rcvbuf.size";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = //
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
Boolean
.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
public static int socketSndbufSize = //
public static int socketSndbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
public static int socketRcvbufSize = //
public static int socketRcvbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
}
......@@ -125,11 +125,11 @@ public class CommitLog {
return this.mappedFileQueue.remainHowManyDataToFlush();
}
public int deleteExpiredFile(//
final long expiredTime, //
final int deleteFilesInterval, //
final long intervalForcibly, //
final boolean cleanImmediately//
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
......@@ -244,43 +244,30 @@ public class CommitLog {
byte[] bytesContent = new byte[totalSize];
// 3 BODYCRC
int bodyCRC = byteBuffer.getInt();
// 4 QUEUEID
int queueId = byteBuffer.getInt();
// 5 FLAG
int flag = byteBuffer.getInt();
// 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
// 7 PHYSICALOFFSET
long physicOffset = byteBuffer.getLong();
// 8 SYSFLAG
int sysFlag = byteBuffer.getInt();
// 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
// 10
ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);
// 11 STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong();
// 12
ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);
// 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.getInt();
// 14 Prepared Transaction Offset
long preparedTransactionOffset = byteBuffer.getLong();
// 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
......@@ -298,7 +285,6 @@ public class CommitLog {
}
}
// 16 TOPIC
byte topicLen = byteBuffer.get();
byteBuffer.get(bytesContent, 0, topicLen);
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
......@@ -307,7 +293,6 @@ public class CommitLog {
String keys = "";
String uniqKey = null;
// 17 properties
short propertiesLength = byteBuffer.getShort();
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
......@@ -355,19 +340,19 @@ public class CommitLog {
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(//
topic, // 1
queueId, // 2
physicOffset, // 3
totalSize, // 4
tagsCode, // 5
storeTimestamp, // 6
queueOffset, // 7
keys, // 8
uniqKey, //9
sysFlag, // 10
preparedTransactionOffset, // 11
propertiesMap // 12
return new DispatchRequest(
topic,
queueId,
physicOffset,
totalSize,
tagsCode,
storeTimestamp,
queueOffset,
keys,
uniqKey,
sysFlag,
preparedTransactionOffset,
propertiesMap
);
} catch (Exception e) {
}
......@@ -376,24 +361,23 @@ public class CommitLog {
}
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ 8 // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ 8 // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8 // 14 Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
+ 1 + topicLength // 15 TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
// propertiesLength
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ 8 //BORNHOST
+ 8 //STORETIMESTAMP
+ 8 //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
......@@ -500,18 +484,18 @@ public class CommitLog {
return false;
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
......@@ -547,7 +531,7 @@ public class CommitLog {
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
......@@ -1270,8 +1254,6 @@ public class CommitLog {
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
......@@ -1391,7 +1373,6 @@ public class CommitLog {
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
......
......@@ -218,9 +218,7 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = false;
}
/**
*/
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
......@@ -392,7 +390,7 @@ public class DefaultMessageStore implements MessageStore {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
if (diff < 10000000 //
if (diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
return true;
}
......@@ -579,9 +577,7 @@ public class DefaultMessageStore implements MessageStore {
return getResult;
}
/**
*/
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
......@@ -592,9 +588,7 @@ public class DefaultMessageStore implements MessageStore {
return 0;
}
/**
*/
public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
......@@ -891,9 +885,9 @@ public class DefaultMessageStore implements MessageStore {
ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
for (ConsumeQueue cq : queueTable.values()) {
cq.destroy();
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
cq.getTopic(), //
cq.getQueueId() //
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
cq.getTopic(),
cq.getQueueId()
);
this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
......@@ -922,17 +916,17 @@ public class DefaultMessageStore implements MessageStore {
long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
if (maxCLOffsetInConsumeQueue == -1) {
log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", //
nextQT.getValue().getTopic(), //
nextQT.getValue().getQueueId(), //
nextQT.getValue().getMaxPhysicOffset(), //
log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
nextQT.getValue().getTopic(),
nextQT.getValue().getQueueId(),
nextQT.getValue().getMaxPhysicOffset(),
nextQT.getValue().getMinLogicOffset());
} else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
log.info(
"cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
topic, //
nextQT.getKey(), //
minCommitLogOffset, //
"cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
topic,
nextQT.getKey(),
minCommitLogOffset,
maxCLOffsetInConsumeQueue);
DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
......@@ -1072,11 +1066,11 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
......@@ -1462,11 +1456,11 @@ public class DefaultMessageStore implements MessageStore {
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
fileReservedTime, //
timeup, //
spacefull, //
manualDeleteFileSeveralTimes, //
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
......@@ -1725,7 +1719,7 @@ public class DefaultMessageStore implements MessageStore {
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
......@@ -1751,7 +1745,7 @@ public class DefaultMessageStore implements MessageStore {
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// FIXED BUG By shijia
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
......
......@@ -66,23 +66,14 @@ public class DispatchRequest {
}
public DispatchRequest(int size) {
// 1
this.topic = "";
// 2
this.queueId = 0;
// 3
this.commitLogOffset = 0;
// 4
this.msgSize = size;
// 5
this.tagsCode = 0;
// 6
this.storeTimestamp = 0;
// 7
this.consumeQueueOffset = 0;
// 8
this.keys = "";
//9
this.uniqKey = null;
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
......@@ -91,23 +82,14 @@ public class DispatchRequest {
}
public DispatchRequest(int size, boolean success) {
// 1
this.topic = "";
// 2
this.queueId = 0;
// 3
this.commitLogOffset = 0;
// 4
this.msgSize = size;
// 5
this.tagsCode = 0;
// 6
this.storeTimestamp = 0;
// 7
this.consumeQueueOffset = 0;
// 8
this.keys = "";
// 9
this.uniqKey = null;
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
......
......@@ -404,9 +404,7 @@ public class MappedFile extends ReferenceResource {
return null;
}
/**
*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
......
......@@ -405,7 +405,6 @@ public class MappedFileQueue {
break;
}
// TODO: Externalize this hardcoded value
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
......
......@@ -78,10 +78,7 @@ public class HAConnection {
return socketChannel;
}
/**
*
*/
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
......@@ -194,10 +191,7 @@ public class HAConnection {
}
}
/**
*
*/
class WriteSocketService extends ServiceThread {
private final Selector selector;
private final SocketChannel socketChannel;
......@@ -333,9 +327,7 @@ public class HAConnection {
HAConnection.log.info(this.getServiceName() + " service end");
}
/**
*/
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
......
......@@ -85,9 +85,7 @@ public class HAService {
return result;
}
/**
*/
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
......@@ -374,17 +372,6 @@ public class HAService {
return !this.reportOffset.hasRemaining();
}
// private void reallocateByteBuffer() {
// ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// int remain = this.byteBufferRead.limit() - this.dispatchPostion;
// bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain);
// this.dispatchPostion = 0;
// this.byteBufferRead = bb;
// }
/**
*/
private void reallocateByteBuffer() {
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
if (remain > 0) {
......@@ -426,7 +413,6 @@ public class HAService {
break;
}
} else {
// TODO ERROR
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
......@@ -598,8 +584,6 @@ public class HAService {
log.info(this.getServiceName() + " service end");
}
//
// private void disableWriteFlag() {
// if (this.socketChannel != null) {
// SelectionKey sk = this.socketChannel.keyFor(this.selector);
......@@ -610,8 +594,6 @@ public class HAService {
// }
// }
// }
//
//
// private void enableWriteFlag() {
// if (this.socketChannel != null) {
// SelectionKey sk = this.socketChannel.keyFor(this.selector);
......
......@@ -208,7 +208,6 @@ public class IndexFile {
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
// TODO NOTFOUND
} else {
for (int nextIndexToRead = slotValue;;) {
if (phyOffsets.size() >= maxNum) {
......
......@@ -20,11 +20,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*
*/
public class IndexHeader {
public static final int INDEX_HEADER_SIZE = 40;
private static int beginTimestampIndex = 0;
......
......@@ -336,10 +336,7 @@ public class ScheduleMessageService extends ConfigManager {
}
} // end of if (bufferCQ != null)
else {
/*
*/
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
......
......@@ -38,9 +38,7 @@ public class BrokerStats {
this.defaultMessageStore = defaultMessageStore;
}
/**
*/
public void record() {
this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
......
......@@ -119,7 +119,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."//
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
......@@ -132,8 +132,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "//
+ this.serviceState//
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
......@@ -185,13 +185,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
// TODO Auto-generated method stub
return null;
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
// TODO Auto-generated method stub
return null;
}
......@@ -344,8 +342,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public void putKVConfig(String namespace, String key, String value) {
// TODO Auto-generated method stub
}
@Override
......
......@@ -51,7 +51,6 @@ public class PrintMessageSubCommand implements SubCommand {
System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
} catch (UnsupportedEncodingException e) {
//
}
}
}
......@@ -108,10 +107,10 @@ public class PrintMessageSubCommand implements SubCommand {
try {
String topic = commandLine.getOptionValue('t').trim();
String charsetName = //
String charsetName =
!commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
String subExpression = //
String subExpression =
!commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
boolean printBody = !commandLine.hasOption('d') || Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册