提交 167cce03 编写于 作者: D dongeforever 提交者: lollipop

ROCKETMQ-18 Clean code closes apache/incubator-rocketmq#21

上级 66722568
......@@ -497,7 +497,7 @@ public class BrokerController {
long diff = this.messageStore.slaveFallBehindMuch();
// XXX: warn and notify me
log.info("slave fall behind master, how much, {} bytes", diff);
log.info("Slave fall behind master: {} bytes", diff);
}
public Broker2Client getBroker2Client() {
......
......@@ -148,10 +148,8 @@ public class BrokerStartup {
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
if (addrArray != null) {
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
......@@ -211,13 +209,13 @@ public class BrokerStartup {
@Override
public void run() {
synchronized (this) {
log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long begineTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - begineTime;
log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
......
......@@ -72,7 +72,7 @@ public class ConsumerGroupInfo {
}
public List<Channel> getAllChannel() {
List<Channel> result = new ArrayList<Channel>();
List<Channel> result = new ArrayList<>();
result.addAll(this.channelInfoTable.keySet());
......@@ -80,7 +80,7 @@ public class ConsumerGroupInfo {
}
public List<String> getAllClientId() {
List<String> result = new ArrayList<String>();
List<String> result = new ArrayList<>();
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
......
......@@ -167,7 +167,7 @@ public class ConsumerManager {
}
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<String>();
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
......
......@@ -133,7 +133,7 @@ public class ProducerManager {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new HashMap<Channel, ClientChannelInfo>();
channelTable = new HashMap<>();
this.groupChannelTable.put(group, channelTable);
}
......
......@@ -230,7 +230,7 @@ public class Broker2Client {
}
private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
List<MessageQueueForC> list = new ArrayList<MessageQueueForC>();
List<MessageQueueForC> list = new ArrayList<>();
for (Entry<MessageQueue, Long> entry : table.entrySet()) {
MessageQueue mq = entry.getKey();
MessageQueueForC tmp =
......
......@@ -42,7 +42,7 @@ public class RebalanceLockManager {
try {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
......@@ -132,7 +132,7 @@ public class RebalanceLockManager {
try {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
......
......@@ -137,7 +137,7 @@ public class FilterServerManager {
}
public List<String> buildNewFilterServerList() {
List<String> addr = new ArrayList<String>();
List<String> addr = new ArrayList<>();
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, FilterServerInfo> next = it.next();
......
......@@ -26,9 +26,9 @@ public class FilterServerUtil {
String[] cmdArray = splitShellString(shellString);
process = Runtime.getRuntime().exec(cmdArray);
process.waitFor();
log.info("callShell: <{}> OK", shellString);
log.info("CallShell: <{}> OK", shellString);
} catch (Throwable e) {
log.error("callShell: readLine IOException, " + shellString, e);
log.error("CallShell: readLine IOException, {}", shellString, e);
} finally {
if (null != process)
process.destroy();
......
......@@ -103,7 +103,6 @@ public class BrokerFastFailure {
}
}
}
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
......
......@@ -20,7 +20,7 @@ import java.util.ArrayList;
import java.util.List;
public class ManyPullRequest {
private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
public synchronized void addPullRequest(final PullRequest pullRequest) {
this.pullRequestList.add(pullRequest);
......
......@@ -65,7 +65,7 @@ public class PullRequestHoldService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
......@@ -85,7 +85,7 @@ public class PullRequestHoldService extends ServiceThread {
}
}
log.info(this.getServiceName() + " service end");
log.info("{} service end", this.getServiceName());
}
@Override
......@@ -96,7 +96,7 @@ public class PullRequestHoldService extends ServiceThread {
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (kArray != null && 2 == kArray.length) {
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
......@@ -127,9 +127,8 @@ public class PullRequestHoldService extends ServiceThread {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
}
Long tmp = tagsCode;
if (newestOffset > request.getPullFromThisOffset()) {
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
......
......@@ -75,7 +75,7 @@ public class BrokerOuterAPI {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
......@@ -121,7 +121,7 @@ public class BrokerOuterAPI {
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, " + namesrvAddr, e);
log.warn("registerBroker Exception, {}", namesrvAddr, e);
}
}
}
......@@ -199,7 +199,7 @@ public class BrokerOuterAPI {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, " + namesrvAddr, e);
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
......
......@@ -139,13 +139,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
final SendMessageRequestHeader requestHeader, RemotingCommand request,
final RemotingCommand response) {
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
log.warn("putMessage message topic length too long {}", requestHeader.getTopic().length());
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
return response;
}
if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long "
+ requestHeader.getProperties().length());
log.warn("putMessage message properties length too long {}", requestHeader.getProperties().length());
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
return response;
}
......@@ -188,8 +187,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
}
log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
+ ctx.channel().remoteAddress());
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
requestHeader.getTopic(), //
requestHeader.getDefaultTopic(), //
......
......@@ -267,7 +267,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
} else {
log.error("No topic in this broker, client: " + ctx.channel().remoteAddress());
log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No topic in this broker");
return response;
......@@ -290,7 +290,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
Properties properties = MixAll.string2Properties(bodyStr);
if (properties != null) {
log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress());
log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
this.brokerController.getConfiguration().update(properties);
if (properties.containsKey("brokerPermission")) {
this.brokerController.registerBrokerAll(false, false);
......@@ -476,7 +476,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
} else {
log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress());
log.error("No subscription group in this broker, client:{} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No subscription group in this broker");
return response;
......@@ -718,7 +718,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
} else {
log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress());
log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No consumer offset in this broker");
return response;
......@@ -745,7 +745,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
} else {
log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress());
log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No delay offset in this broker");
return response;
......
......@@ -88,12 +88,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
if (LOG.isDebugEnabled()) {
LOG.debug("receive PullMessage request command, " + request);
LOG.debug("receive PullMessage request command, {}", request);
}
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
......@@ -101,8 +101,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
......@@ -120,10 +119,9 @@ public class PullMessageProcessor implements NettyRequestProcessor {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(
"topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
......@@ -134,8 +132,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
+ " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
String errorInfo = String.format("queueId[%d] is illagal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOG.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
......@@ -148,8 +146,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getSubscription());
} catch (Exception e) {
LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
......@@ -158,7 +156,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
......@@ -173,15 +171,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
......@@ -261,15 +259,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
+ getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
......@@ -346,12 +343,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
LOG.error("transfer many message by pagecache exception", e);
LOG.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
}
......@@ -480,7 +477,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} catch (Exception e) {
LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e);
}
}
......@@ -499,21 +496,20 @@ public class PullMessageProcessor implements NettyRequestProcessor {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
future.cause());
LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
LOG.error(request.toString());
LOG.error(response.toString());
}
}
});
} catch (Throwable e) {
LOG.error("processRequestWrapper process request over, but response failed", e);
LOG.error("ProcessRequestWrapper process request over, but response failed", e);
LOG.error(request.toString());
LOG.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
LOG.error("executeRequestWhenWakeup run", e1);
LOG.error("ExecuteRequestWhenWakeup run", e1);
}
}
};
......
......@@ -146,7 +146,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("transfer one message by page cache failed, ", future.cause());
log.error("Transfer one message from page cache failed, ", future.cause());
}
}
});
......
......@@ -252,7 +252,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
if (log.isDebugEnabled()) {
log.debug("receive SendMessage request command, " + request);
log.debug("receive SendMessage request command, {}", request);
}
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
......
......@@ -68,10 +68,10 @@ public class SlaveSynchronize {
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("update slave topic config from master, {}", masterAddrBak);
log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("syncTopicConfig Exception, " + masterAddrBak, e);
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
......@@ -85,9 +85,9 @@ public class SlaveSynchronize {
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("update slave consumer offset from master, {}", masterAddrBak);
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
......@@ -106,12 +106,12 @@ public class SlaveSynchronize {
try {
MixAll.string2File(delayOffset, fileName);
} catch (IOException e) {
log.error("persist file Exception, " + fileName, e);
log.error("Persist file Exception, {}", fileName, e);
}
}
log.info("update slave delay offset from master, {}", masterAddrBak);
log.info("Update slave delay offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("syncDelayOffset Exception, " + masterAddrBak, e);
log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
}
}
}
......@@ -134,10 +134,10 @@ public class SlaveSynchronize {
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
log.info("update slave Subscription Group from master, {}", masterAddrBak);
log.info("Update slave Subscription Group from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
}
}
}
......
......@@ -99,9 +99,9 @@ public class SubscriptionGroupManager extends ConfigManager {
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: " + old + " new: " + config);
log.info("update subscription group config, old: {} new: {}", old, config);
} else {
log.info("create new subscription group, " + config);
log.info("create new subscription group, {}", config);
}
this.dataVersion.nextVersion();
......@@ -181,11 +181,11 @@ public class SubscriptionGroupManager extends ConfigManager {
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
if (old != null) {
log.info("delete subscription group OK, subscription group: " + old);
log.info("delete subscription group OK, subscription group:{}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete subscription group failed, subscription group: " + old + " not exist");
log.warn("delete subscription group failed, subscription group: {} not exist", old);
}
}
}
......@@ -181,18 +181,15 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+ remoteAddress);
LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ "] not exist." + " producer: " + remoteAddress);
LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress);
}
if (topicConfig != null) {
LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+ " producer: " + remoteAddress);
LOG.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
......@@ -307,9 +304,9 @@ public class TopicConfigManager extends ConfigManager {
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
LOG.info("update topic config, old: " + old + " new: " + topicConfig);
LOG.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
} else {
LOG.info("create new topic, " + topicConfig);
LOG.info("create new topic [{}]", topicConfig);
}
this.dataVersion.nextVersion();
......@@ -362,11 +359,11 @@ public class TopicConfigManager extends ConfigManager {
public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
LOG.info("delete topic config OK, topic: " + old);
LOG.info("Delete topic config OK, topic:{}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
LOG.warn("delete topic config failed, topic: " + topic + " not exist");
LOG.warn("Delete topic config failed, topic:{} not exist", topic);
}
}
......
......@@ -125,9 +125,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
new Object[] {
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp()});
requestHeader.getTimestamp());
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
......
......@@ -212,13 +212,11 @@ public class MQClientAPIImpl {
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
if (addrArray != null) {
for (String addr : addrArray) {
lst.add(addr);
}
this.remotingClient.updateNameServerAddressList(lst);
for (String addr : addrArray) {
lst.add(addr);
}
this.remotingClient.updateNameServerAddressList(lst);
}
public void start() {
......@@ -468,7 +466,7 @@ public class MQClientAPIImpl {
}
try {
sendCallback.onException(e);
} catch (Exception e2) {
} catch (Exception ignored) {
}
}
}
......@@ -1074,8 +1072,7 @@ public class MQClientAPIImpl {
request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
return consumerConnection;
return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
}
default:
break;
......@@ -1151,8 +1148,7 @@ public class MQClientAPIImpl {
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
return responseBody;
return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
}
default:
break;
......@@ -1226,8 +1222,7 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
TopicList topicList = TopicList.decode(body, TopicList.class);
return topicList;
return TopicList.decode(body, TopicList.class);
}
}
default:
......
......@@ -20,9 +20,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
public class MQClientManager {
private final static Logger log = ClientLogger.getLog();
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
......@@ -51,7 +54,7 @@ public class MQClientManager {
if (prev != null) {
instance = prev;
} else {
// TODO log
log.warn("Previous MQClientInstance has created for clientId:[{}]", clientId);
}
}
......
......@@ -227,7 +227,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception e) {
} catch (Exception ignore) {
}
}
}
......@@ -246,7 +246,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}
......@@ -257,7 +257,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}
......@@ -314,9 +314,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
if (allocateMq != null) {
mqs.addAll(allocateMq);
}
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
......
......@@ -921,7 +921,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void doRebalance() {
if (this.rebalanceImpl != null && !this.pause) {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
......@@ -932,9 +932,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
if (allocateMq != null) {
mqs.addAll(allocateMq);
}
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
......
......@@ -58,13 +58,11 @@ public class ProcessQueue {
private volatile long msgAccCnt = 0;
public boolean isLockExpired() {
boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return result;
return (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
}
public boolean isPullExpired() {
boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
return result;
return (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
}
/**
......
......@@ -472,7 +472,7 @@ public class MQClientInstance {
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending hearbeat, but no consumer and no producer");
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
......@@ -841,13 +841,8 @@ public class MQClientInstance {
if (addr != null) {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup,
consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException | InterruptedException | MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
......@@ -1064,7 +1059,9 @@ public class MQClientInstance {
}
}
} finally {
consumer.resume();
if (consumer != null) {
consumer.resume();
}
}
}
......@@ -1134,14 +1131,14 @@ public class MQClientInstance {
List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();
StringBuffer strBuffer = new StringBuffer();
StringBuilder strBuilder = new StringBuilder();
if (nsList != null) {
for (String addr : nsList) {
strBuffer.append(addr + ";");
strBuilder.append(addr).append(";");
}
}
String nsAddr = strBuffer.toString();
String nsAddr = strBuilder.toString();
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,
......
......@@ -18,9 +18,12 @@
package org.apache.rocketmq.client.latency;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
public class MQFaultStrategy {
private final static Logger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
......@@ -80,6 +83,7 @@ public class MQFaultStrategy {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
......
......@@ -137,7 +137,7 @@ public class MixAll {
return Math.abs(value);
}
public static final void string2File(final String str, final String fileName) throws IOException {
public static void string2File(final String str, final String fileName) throws IOException {
String tmpFile = fileName + ".tmp";
string2FileNotSafe(str, tmpFile);
......@@ -155,7 +155,8 @@ public class MixAll {
file.renameTo(new File(fileName));
}
public static final void string2FileNotSafe(final String str, final String fileName) throws IOException {
public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
File file = new File(fileName);
File fileParent = file.getParentFile();
if (fileParent != null) {
......@@ -170,21 +171,17 @@ public class MixAll {
throw e;
} finally {
if (fileWriter != null) {
try {
fileWriter.close();
} catch (IOException e) {
throw e;
}
fileWriter.close();
}
}
}
public static final String file2String(final String fileName) {
public static String file2String(final String fileName) {
File file = new File(fileName);
return file2String(file);
}
public static final String file2String(final File file) {
public static String file2String(final File file) {
if (file.exists()) {
char[] data = new char[(int) file.length()];
boolean result = false;
......@@ -213,7 +210,7 @@ public class MixAll {
return null;
}
public static final String file2String(final URL url) {
public static String file2String(final URL url) {
InputStream in = null;
try {
URLConnection urlConnection = url.openConnection();
......@@ -223,12 +220,12 @@ public class MixAll {
byte[] data = new byte[len];
in.read(data, 0, len);
return new String(data, "UTF-8");
} catch (Exception e) {
} catch (Exception ignored) {
} finally {
if (null != in) {
try {
in.close();
} catch (IOException e) {
} catch (IOException ignored) {
}
}
}
......@@ -258,9 +255,7 @@ public class MixAll {
if (null == value) {
value = "";
}
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
} catch (IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
......@@ -273,7 +268,6 @@ public class MixAll {
if (log != null) {
log.info(name + "=" + value);
} else {
}
}
}
......@@ -318,9 +312,7 @@ public class MixAll {
try {
field.setAccessible(true);
value = field.get(object);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
} catch (IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
......@@ -365,10 +357,10 @@ public class MixAll {
} else {
continue;
}
method.invoke(object, new Object[] {arg});
method.invoke(object, arg);
}
}
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}
......
......@@ -79,7 +79,7 @@ public class UtilAll {
public static boolean isItTimeToDo(final String when) {
String[] whiles = when.split(";");
if (whiles != null && whiles.length > 0) {
if (whiles.length > 0) {
Calendar now = Calendar.getInstance();
for (String w : whiles) {
int nowHour = Integer.parseInt(w);
......@@ -186,6 +186,7 @@ public class UtilAll {
if (!file.exists()) {
boolean result = file.mkdirs();
if (!result) {
//TO DO
}
}
......@@ -202,7 +203,8 @@ public class UtilAll {
return -1;
}
public static final int crc32(byte[] array) {
public static int crc32(byte[] array) {
if (array != null) {
return crc32(array, 0, array.length);
}
......@@ -210,7 +212,8 @@ public class UtilAll {
return 0;
}
public static final int crc32(byte[] array, int offset, int length) {
public static int crc32(byte[] array, int offset, int length) {
CRC32 crc32 = new CRC32();
crc32.update(array, offset, length);
return (int) (crc32.getValue() & 0x7FFFFFFF);
......@@ -267,15 +270,15 @@ public class UtilAll {
} finally {
try {
byteArrayInputStream.close();
} catch (IOException e) {
} catch (IOException ignored) {
}
try {
inflaterInputStream.close();
} catch (IOException e) {
} catch (IOException ignored) {
}
try {
byteArrayOutputStream.close();
} catch (IOException e) {
} catch (IOException ignored) {
}
}
......
......@@ -46,7 +46,7 @@ public class FilterAPI {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags != null && tags.length > 0) {
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
......
......@@ -16,19 +16,19 @@
*/
package org.apache.rocketmq.common.message;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
public class MessageDecoder {
public final static int MSG_ID_LENGTH = 8 + 8;
......@@ -318,10 +318,6 @@ public class MessageDecoder {
}
return msgExt;
} catch (UnknownHostException e) {
byteBuffer.position(byteBuffer.limit());
} catch (BufferUnderflowException e) {
byteBuffer.position(byteBuffer.limit());
} catch (Exception e) {
byteBuffer.position(byteBuffer.limit());
}
......@@ -366,12 +362,10 @@ public class MessageDecoder {
Map<String, String> map = new HashMap<String, String>();
if (properties != null) {
String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR));
if (items != null) {
for (String i : items) {
String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
if (nv != null && 2 == nv.length) {
map.put(nv[0], nv[1]);
}
for (String i : items) {
String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
if (2 == nv.length) {
map.put(nv[0], nv[1]);
}
}
}
......
......@@ -19,9 +19,6 @@ package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MQProtosHelper {
......@@ -38,13 +35,7 @@ public class MQProtosHelper {
if (response != null) {
return ResponseCode.SUCCESS == response.getCode();
}
} catch (RemotingConnectException e) {
e.printStackTrace();
} catch (RemotingSendRequestException e) {
e.printStackTrace();
} catch (RemotingTimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -54,7 +54,7 @@ public class MomentStatsItemSet {
public void run() {
try {
printAtMinutes();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
......
......@@ -94,7 +94,7 @@ public class StatsItem {
public void run() {
try {
samplingInSeconds();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.SECONDS);
......@@ -104,7 +104,7 @@ public class StatsItem {
public void run() {
try {
samplingInMinutes();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.MINUTES);
......@@ -114,7 +114,7 @@ public class StatsItem {
public void run() {
try {
samplingInHour();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 1, TimeUnit.HOURS);
......
......@@ -47,7 +47,7 @@ public class StatsItemSet {
public void run() {
try {
samplingInSeconds();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.SECONDS);
......@@ -57,7 +57,7 @@ public class StatsItemSet {
public void run() {
try {
samplingInMinutes();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.MINUTES);
......@@ -67,7 +67,7 @@ public class StatsItemSet {
public void run() {
try {
samplingInHour();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, 0, 1, TimeUnit.HOURS);
......@@ -77,7 +77,7 @@ public class StatsItemSet {
public void run() {
try {
printAtMinutes();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
......@@ -87,7 +87,7 @@ public class StatsItemSet {
public void run() {
try {
printAtHour();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
......@@ -97,7 +97,7 @@ public class StatsItemSet {
public void run() {
try {
printAtDay();
} catch (Throwable e) {
} catch (Throwable ignored) {
}
}
}, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
......
......@@ -17,10 +17,11 @@
package org.apache.rocketmq.common;
import org.junit.Test;
import org.junit.Assert;
import java.net.InetAddress;
import java.util.List;
import junit.framework.Assert;
import org.junit.Test;
public class MixAllTest {
......
......@@ -17,12 +17,13 @@
package org.apache.rocketmq.common;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.junit.Assert;
import org.junit.Test;
public class RemotingUtilTest {
@Test
public void test() throws Exception {
String a = RemotingUtil.getLocalAddress();
System.out.println(a);
Assert.assertTrue(a.length() > 0);
}
}
......@@ -19,17 +19,22 @@ package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
public class ConsumeStatusTest {
@Test
public void decode_test() throws Exception {
public void decodeTest() throws Exception {
ConsumeStatus cs = new ConsumeStatus();
cs.setConsumeFailedTPS(0L);
cs.setConsumeFailedTPS(10);
cs.setPullRT(100);
cs.setPullTPS(1000);
String json = RemotingSerializable.toJson(cs, true);
System.out.println(json);
ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
Assert.assertEquals(fromJson.getPullRT(), cs.getPullRT(), 0.0001);
Assert.assertEquals(fromJson.getPullTPS(), cs.getPullTPS(), 0.0001);
Assert.assertEquals(fromJson.getConsumeFailedTPS(), cs.getConsumeFailedTPS(), 0.0001);
}
}
......@@ -49,7 +49,7 @@ public class Producer {
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false;
final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
......@@ -140,7 +140,7 @@ public class Producer {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
......@@ -156,7 +156,7 @@ public class Producer {
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
} catch (InterruptedException ignored) {
}
}
}
......
......@@ -43,8 +43,8 @@ public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false;
ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false;
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize);
......
......@@ -54,13 +54,7 @@ public class Producer {
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
......
......@@ -45,9 +45,7 @@ public class TransactionProducer {
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
......
......@@ -279,12 +279,10 @@ public class RouteInfoManager {
try {
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
if (brokerLiveInfo != null) {
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
}
);
this.filterServerTable.remove(brokerAddr);
......
......@@ -44,11 +44,11 @@ public class RemotingUtil {
private static boolean isWindowsPlatform = false;
static {
if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) {
if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
isLinuxPlatform = true;
}
if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("windows") >= 0) {
if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
isWindowsPlatform = true;
}
}
......
......@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException e1) {
} catch (InterruptedException ignored) {
}
}
} finally {
......
......@@ -1150,7 +1150,7 @@ public class CommitLog {
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
......@@ -1158,7 +1158,7 @@ public class CommitLog {
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData == null ? 0 : topicData.length;
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
......@@ -1229,7 +1229,7 @@ public class CommitLog {
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort(propertiesLength);
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
......
......@@ -666,7 +666,7 @@ public class DefaultMessageStore implements MessageStore {
final int size = result.getByteBuffer().getInt();
long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
return storeTime;
} catch (Exception e) {
} catch (Exception ignored) {
} finally {
result.release();
}
......@@ -1491,7 +1491,7 @@ public class DefaultMessageStore implements MessageStore {
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {
}
}
}
......@@ -1592,7 +1592,7 @@ public class DefaultMessageStore implements MessageStore {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {
}
}
......
......@@ -24,7 +24,7 @@ public class StoreUtil {
@SuppressWarnings("restriction")
public static long getTotalPhysicalMemorySize() {
long physicalTotal = 1024 * 1024 * 1024 * 24;
long physicalTotal = 1024 * 1024 * 1024 * 24L;
OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
......
......@@ -74,10 +74,10 @@ public class IndexService {
log.info("load index file OK, " + f.getFileName());
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file " + file + " error", e);
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
continue;
log.error("load file {} error", file, e);
}
}
}
......
......@@ -114,7 +114,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
String lastTime = "-";
try {
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
} catch (Exception e) {
} catch (Exception ignored) {
}
if (offsetWrapper.getLastTimestamp() > 0)
......
......@@ -106,8 +106,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine
.getOptionValue('i').trim());
boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean
.parseBoolean(commandLine.getOptionValue('p').trim());
boolean printAsTlog = commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine
.getOptionValue('m').trim();
......
......@@ -224,18 +224,19 @@ public class ClusterListSubCommand implements SubCommand {
version = kvTable.getTable().get("brokerVersionDesc");
{
String[] tpss = putTps.split(" ");
if (tpss != null && tpss.length > 0) {
if (tpss.length > 0) {
in = Double.parseDouble(tpss[0]);
}
}
{
String[] tpss = getTransferedTps.split(" ");
if (tpss != null && tpss.length > 0) {
if (tpss.length > 0) {
out = Double.parseDouble(tpss[0]);
}
}
} catch (Exception e) {
e.printStackTrace();
}
double hour = 0.0;
......@@ -251,16 +252,16 @@ public class ClusterListSubCommand implements SubCommand {
}
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
clusterName,
brokerName,
next1.getKey().longValue(),
next1.getValue(),
version,
String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
pageCacheLockTimeMills,
String.format("%2.2f", hour),
String.format("%.4f", space)
clusterName,
brokerName,
next1.getKey(),
next1.getValue(),
version,
String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
pageCacheLockTimeMills,
String.format("%2.2f", hour),
String.format("%.4f", space)
);
}
}
......
......@@ -91,6 +91,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX
+ groupName);
} catch (Exception e) {
e.printStackTrace();
}
return;
}
......
......@@ -53,7 +53,6 @@ public class StartMonitoringSubCommand implements SubCommand {
monitorService.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
......@@ -93,6 +93,7 @@ public class PrintMessageByQueueCommand implements SubCommand {
System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
......@@ -160,11 +161,11 @@ public class PrintMessageByQueueCommand implements SubCommand {
String charsetName =
!commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
boolean printMsg =
!commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
boolean printBody =
!commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
boolean calByTag =
!commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
String subExpression =
!commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
......
......@@ -113,8 +113,7 @@ public class PrintMessageSubCommand implements SubCommand {
String subExpression = //
!commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
boolean printBody = //
!commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
boolean printBody = !commandLine.hasOption('d') || Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
consumer.start();
......
......@@ -133,7 +133,6 @@ public class Store {
// 5 FLAG
int flag = byteBuffer.getInt();
flag = flag + 0;
// 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
......@@ -146,7 +145,6 @@ public class Store {
// 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
bornTimeStamp = bornTimeStamp + 0;
// 10 BORNHOST(IP+PORT)
byteBuffer.position(byteBuffer.position() + 8);
......
......@@ -59,7 +59,6 @@ public class DeleteKvConfigCommand implements SubCommand {
defaultMQAdminExt.start();
defaultMQAdminExt.deleteKvConfig(namespace, key);
System.out.printf("delete kv config from namespace success.%n");
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
......
......@@ -55,7 +55,7 @@ public class GetNamesrvConfigCommand implements SubCommand {
if (servers != null && servers.length() > 0) {
String[] serverArray = servers.trim().split(";");
if (serverArray != null && serverArray.length > 0) {
if (serverArray.length > 0) {
serverList = Arrays.asList(serverArray);
}
}
......@@ -71,7 +71,6 @@ public class GetNamesrvConfigCommand implements SubCommand {
System.out.printf("%-50s= %s\n", key, nameServerConfigs.get(server).get(key));
}
}
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
......
......@@ -65,7 +65,6 @@ public class UpdateKvConfigCommand implements SubCommand {
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value);
System.out.printf("create or update kv config to namespace success.%n");
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
......
......@@ -69,7 +69,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand {
if (servers != null && servers.length() > 0) {
String[] serverArray = servers.trim().split(";");
if (serverArray != null && serverArray.length > 0) {
if (serverArray.length > 0) {
serverList = Arrays.asList(serverArray);
}
}
......@@ -80,7 +80,6 @@ public class UpdateNamesrvConfigCommand implements SubCommand {
System.out.printf("update name server config success!%s\n%s : %s\n",
serverList == null ? "" : serverList, key, value);
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
......
......@@ -76,7 +76,7 @@ public class CloneGroupOffsetCommand implements SubCommand {
defaultMQAdminExt.start();
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(srcGroup);
Set<MessageQueue> mqs = consumeStats.getOffsetTable().keySet();
if (mqs != null && !mqs.isEmpty()) {
if (!mqs.isEmpty()) {
TopicRouteData topicRoute = defaultMQAdminExt.examineTopicRouteInfo(topic);
for (MessageQueue mq : mqs) {
String addr = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册