diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 152f373d46fec4bacbcfc3a831698555a7a6f0f7..70027cdc91e3d9cde1455582f5d4204beb06df55 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -202,7 +202,7 @@ public class Broker2Client { try { this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", - new Object[]{topic, group, entry.getValue().getClientId()}); + topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[]{topic, group}, e); @@ -290,7 +290,7 @@ public class Broker2Client { consumerStatusTable.put(clientId, body.getMessageQueueTable()); log.info( "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", - new Object[]{topic, group, clientId}); + topic, group, clientId); } } default: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java index 1c40c0ef13f7303505a772643b8437843c71ffb7..de4cc374645c33ac8af623bf2bb406bbf74e41dc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -37,7 +37,6 @@ public class FilterServerUtil { } private static String[] splitShellString(final String shellString) { - String[] split = shellString.split(" "); - return split; + return shellString.split(" "); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 4810d771bbf2d45fd00b453fc3e0cf23ad30cfbf..2f4b568f6b3e30e8e40144648e94a5141bad8e3c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -77,7 +77,7 @@ public class BrokerFastFailure { break; } final RequestTask rt = castRunnable(runnable); - if (rt.isStopRun()) { + if (rt == null || rt.isStopRun()) { break; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ef9065ecbc8711ea5220f7d8e8ca40db14c0787a..7188e8dedc916618e6cad77f27d52ec2fb31fd76 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { String topic = arrays[0]; String group = arrays[1]; @@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry next = it.next(); long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); long offsetInPersist = next.getValue(); - if (offsetInPersist > minOffsetInStore) { - result = false; - } else { - result = true; - } + result = offsetInPersist <= minOffsetInStore; } return result; @@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { if (group.equals(arrays[1])) { topics.add(arrays[0]); } @@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { + if (arrays.length == 2) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index ab02cebf5f2897d3cc7b2debca947a99bfd6ea6b..335c105482af00ffa5f588621c846c3150e7f1c3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -88,13 +88,11 @@ public class BrokerOuterAPI { public void updateNameServerAddressList(final String addrs) { List lst = new ArrayList(); String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); + for (String addr : addrArray) { + lst.add(addr); } + + this.remotingClient.updateNameServerAddressList(lst); } public RegisterBrokerResult registerBrokerAll( @@ -159,6 +157,7 @@ public class BrokerOuterAPI { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { + // Ignore } return null; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java index d27b6aa4406479c7c8f6d1b5ea4bc6598089aa0b..42793ae50374d48059c647b98cb95dc96116e340 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java @@ -37,8 +37,7 @@ public final class MessageStoreFactory { @SuppressWarnings("unchecked") Class clazz = (Class) Class.forName(pluginClass); Constructor construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); - AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); - messageStore = pluginMessageStore; + messageStore = construct.newInstance(context, messageStore); } catch (Throwable e) { throw new RuntimeException(String.format( "Initialize plugin's class %s not found!", pluginClass), e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 81a239be81e0b80a12de75b66176ef4dee2b8a69..8a285e868a51f634d28de6dd83bf67dc9b361d3c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces return response; } if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { - String errorMsg = - "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); @@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } hook.sendMessageBefore(context); - requestHeader.setProperties(context.getMsgProps()); + if (requestHeader != null) { + requestHeader.setProperties(context.getMsgProps()); + } } catch (Throwable e) { + // Ignore } } } @@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } hook.sendMessageAfter(context); } catch (Throwable e) { - + // Ignore } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 94aa414017eb8f707544ab3fb80ad945cd18d1c9..4588d2d89741a6321c3e7dce0c1274b56f917700 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce()}); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { @@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()}); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getClientAddr()); @@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); - RemotingCommand consumerResponse = - this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); - return consumerResponse; + return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 7f8859387b22b122d73e313ae66544202fa0dcbe..7625d2130086832c627f65f2aa4ca46243b8fbac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } } catch (RemotingCommandException e1) { - LOG.error("excuteRequestWhenWakeup run", e1); + LOG.error("executeRequestWhenWakeup run", e1); } } }; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index b41e0a5c33de28bfcefa166badaa2af72a0bcf06..5c602553a62ee5722736e083c7654e5167653c3c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { - log.error("transfer query message by pagecache failed, ", future.cause()); + log.error("transfer query message by page cache failed, ", future.cause()); } } }); @@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { - log.error("transfer one message by pagecache failed, ", future.cause()); + log.error("transfer one message by page cache failed, ", future.cause()); } } }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index defe7e323381078eea73331b913264c871819a7f..5cebd0e7f2e597db78ea5dca8a14a6806216a0a0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement try { hook.consumeMessageAfter(context); } catch (Throwable e) { + // Ignore } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java index 758eeed3a4541cd95b776c74d56a2049cf05c9f4..d6e897acddc0b644d4b66861b3ddf75280b41c91 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java @@ -21,26 +21,26 @@ import java.util.List; public interface TransactionStore { - public boolean open(); + boolean open(); - public void close(); + void close(); - public boolean put(final List trs); + boolean put(final List trs); - public void remove(final List pks); + void remove(final List pks); - public List traverse(final long pk, final int nums); + List traverse(final long pk, final int nums); - public long totalRecords(); + long totalRecords(); - public long minPK(); + long minPK(); - public long maxPK(); + long maxPK(); }