From 9165667aa3975900ed1f79fd6c612cfd2e05a01f Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 27 Dec 2016 21:49:37 +0800 Subject: [PATCH] ROCKETMQ-18 Remove bad practices in broker. --- .../broker/client/net/Broker2Client.java | 4 ++-- .../broker/filtersrv/FilterServerUtil.java | 3 +-- .../broker/latency/BrokerFastFailure.java | 2 +- .../broker/offset/ConsumerOffsetManager.java | 12 ++++-------- .../rocketmq/broker/out/BrokerOuterAPI.java | 11 +++++------ .../broker/plugin/MessageStoreFactory.java | 3 +-- .../processor/AbstractSendMessageProcessor.java | 10 ++++++---- .../broker/processor/AdminBrokerProcessor.java | 10 ++++------ .../broker/processor/PullMessageProcessor.java | 2 +- .../broker/processor/QueryMessageProcessor.java | 4 ++-- .../broker/processor/SendMessageProcessor.java | 1 + .../broker/transaction/TransactionStore.java | 16 ++++++++-------- 12 files changed, 36 insertions(+), 42 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 152f373d..70027cdc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -202,7 +202,7 @@ public class Broker2Client { try { this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", - new Object[]{topic, group, entry.getValue().getClientId()}); + topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[]{topic, group}, e); @@ -290,7 +290,7 @@ public class Broker2Client { consumerStatusTable.put(clientId, body.getMessageQueueTable()); log.info( "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", - new Object[]{topic, group, clientId}); + topic, group, clientId); } } default: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java index 1c40c0ef..de4cc374 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -37,7 +37,6 @@ public class FilterServerUtil { } private static String[] splitShellString(final String shellString) { - String[] split = shellString.split(" "); - return split; + return shellString.split(" "); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 4810d771..2f4b568f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -77,7 +77,7 @@ public class BrokerFastFailure { break; } final RequestTask rt = castRunnable(runnable); - if (rt.isStopRun()) { + if (rt == null || rt.isStopRun()) { break; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ef9065ec..7188e8de 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { String topic = arrays[0]; String group = arrays[1]; @@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry next = it.next(); long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); long offsetInPersist = next.getValue(); - if (offsetInPersist > minOffsetInStore) { - result = false; - } else { - result = true; - } + result = offsetInPersist <= minOffsetInStore; } return result; @@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { if (group.equals(arrays[1])) { topics.add(arrays[0]); } @@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index ab02cebf..335c1054 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -88,13 +88,11 @@ public class BrokerOuterAPI { public void updateNameServerAddressList(final String addrs) { List lst = new ArrayList(); String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); + for (String addr : addrArray) { + lst.add(addr); } + + this.remotingClient.updateNameServerAddressList(lst); } public RegisterBrokerResult registerBrokerAll( @@ -159,6 +157,7 @@ public class BrokerOuterAPI { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { + // Ignore } return null; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java index d27b6aa4..42793ae5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java @@ -37,8 +37,7 @@ public final class MessageStoreFactory { @SuppressWarnings("unchecked") Class clazz = (Class) Class.forName(pluginClass); Constructor construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); - AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); - messageStore = pluginMessageStore; + messageStore = construct.newInstance(context, messageStore); } catch (Throwable e) { throw new RuntimeException(String.format( "Initialize plugin's class %s not found!", pluginClass), e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 81a239be..8a285e86 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces return response; } if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { - String errorMsg = - "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); @@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } hook.sendMessageBefore(context); - requestHeader.setProperties(context.getMsgProps()); + if (requestHeader != null) { + requestHeader.setProperties(context.getMsgProps()); + } } catch (Throwable e) { + // Ignore } } } @@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } hook.sendMessageAfter(context); } catch (Throwable e) { - + // Ignore } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 94aa4140..4588d2d8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce()}); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { @@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()}); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getClientAddr()); @@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); - RemotingCommand consumerResponse = - this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); - return consumerResponse; + return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 7f885938..7625d213 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } } catch (RemotingCommandException e1) { - LOG.error("excuteRequestWhenWakeup run", e1); + LOG.error("executeRequestWhenWakeup run", e1); } } }; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index b41e0a5c..5c602553 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { - log.error("transfer query message by pagecache failed, ", future.cause()); + log.error("transfer query message by page cache failed, ", future.cause()); } } }); @@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { - log.error("transfer one message by pagecache failed, ", future.cause()); + log.error("transfer one message by page cache failed, ", future.cause()); } } }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index defe7e32..5cebd0e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement try { hook.consumeMessageAfter(context); } catch (Throwable e) { + // Ignore } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java index 758eeed3..d6e897ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java @@ -21,26 +21,26 @@ import java.util.List; public interface TransactionStore { - public boolean open(); + boolean open(); - public void close(); + void close(); - public boolean put(final List trs); + boolean put(final List trs); - public void remove(final List pks); + void remove(final List pks); - public List traverse(final long pk, final int nums); + List traverse(final long pk, final int nums); - public long totalRecords(); + long totalRecords(); - public long minPK(); + long minPK(); - public long maxPK(); + long maxPK(); } -- GitLab