提交 9165667a 编写于 作者: Y yukon

ROCKETMQ-18 Remove bad practices in broker.

上级 f56e0383
...@@ -202,7 +202,7 @@ public class Broker2Client { ...@@ -202,7 +202,7 @@ public class Broker2Client {
try { try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
new Object[]{topic, group, entry.getValue().getClientId()}); topic, group, entry.getValue().getClientId());
} catch (Exception e) { } catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}", log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[]{topic, group}, e); new Object[]{topic, group}, e);
...@@ -290,7 +290,7 @@ public class Broker2Client { ...@@ -290,7 +290,7 @@ public class Broker2Client {
consumerStatusTable.put(clientId, body.getMessageQueueTable()); consumerStatusTable.put(clientId, body.getMessageQueueTable());
log.info( log.info(
"[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
new Object[]{topic, group, clientId}); topic, group, clientId);
} }
} }
default: default:
......
...@@ -37,7 +37,6 @@ public class FilterServerUtil { ...@@ -37,7 +37,6 @@ public class FilterServerUtil {
} }
private static String[] splitShellString(final String shellString) { private static String[] splitShellString(final String shellString) {
String[] split = shellString.split(" "); return shellString.split(" ");
return split;
} }
} }
...@@ -77,7 +77,7 @@ public class BrokerFastFailure { ...@@ -77,7 +77,7 @@ public class BrokerFastFailure {
break; break;
} }
final RequestTask rt = castRunnable(runnable); final RequestTask rt = castRunnable(runnable);
if (rt.isStopRun()) { if (rt == null || rt.isStopRun()) {
break; break;
} }
......
...@@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays != null && arrays.length == 2) { if (arrays.length == 2) {
String topic = arrays[0]; String topic = arrays[0];
String group = arrays[1]; String group = arrays[1];
...@@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<Integer, Long> next = it.next(); Entry<Integer, Long> next = it.next();
long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
long offsetInPersist = next.getValue(); long offsetInPersist = next.getValue();
if (offsetInPersist > minOffsetInStore) { result = offsetInPersist <= minOffsetInStore;
result = false;
} else {
result = true;
}
} }
return result; return result;
...@@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays != null && arrays.length == 2) { if (arrays.length == 2) {
if (group.equals(arrays[1])) { if (group.equals(arrays[1])) {
topics.add(arrays[0]); topics.add(arrays[0]);
} }
...@@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager { ...@@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey(); String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays != null && arrays.length == 2) { if (arrays.length == 2) {
if (topic.equals(arrays[0])) { if (topic.equals(arrays[0])) {
groups.add(arrays[1]); groups.add(arrays[1]);
} }
......
...@@ -88,13 +88,11 @@ public class BrokerOuterAPI { ...@@ -88,13 +88,11 @@ public class BrokerOuterAPI {
public void updateNameServerAddressList(final String addrs) { public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>(); List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";"); String[] addrArray = addrs.split(";");
if (addrArray != null) { for (String addr : addrArray) {
for (String addr : addrArray) { lst.add(addr);
lst.add(addr);
}
this.remotingClient.updateNameServerAddressList(lst);
} }
this.remotingClient.updateNameServerAddressList(lst);
} }
public RegisterBrokerResult registerBrokerAll( public RegisterBrokerResult registerBrokerAll(
...@@ -159,6 +157,7 @@ public class BrokerOuterAPI { ...@@ -159,6 +157,7 @@ public class BrokerOuterAPI {
try { try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) { } catch (RemotingTooMuchRequestException e) {
// Ignore
} }
return null; return null;
} }
......
...@@ -37,8 +37,7 @@ public final class MessageStoreFactory { ...@@ -37,8 +37,7 @@ public final class MessageStoreFactory {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); messageStore = construct.newInstance(context, messageStore);
messageStore = pluginMessageStore;
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(String.format( throw new RuntimeException(String.format(
"Initialize plugin's class %s not found!", pluginClass), e); "Initialize plugin's class %s not found!", pluginClass), e);
......
...@@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces ...@@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
return response; return response;
} }
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
"the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg); log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg); response.setRemark(errorMsg);
...@@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces ...@@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
} }
hook.sendMessageBefore(context); hook.sendMessageBefore(context);
requestHeader.setProperties(context.getMsgProps()); if (requestHeader != null) {
requestHeader.setProperties(context.getMsgProps());
}
} catch (Throwable e) { } catch (Throwable e) {
// Ignore
} }
} }
} }
...@@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces ...@@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
} }
hook.sendMessageAfter(context); hook.sendMessageAfter(context);
} catch (Throwable e) { } catch (Throwable e) {
// Ignore
} }
} }
} }
......
...@@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ResetOffsetRequestHeader requestHeader = final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce()}); requestHeader.getTimestamp(), requestHeader.isForce());
boolean isC = false; boolean isC = false;
LanguageCode language = request.getLanguage(); LanguageCode language = request.getLanguage();
switch (language) { switch (language) {
...@@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()}); RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getClientAddr()); requestHeader.getClientAddr());
...@@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
newRequest.setExtFields(request.getExtFields()); newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody()); newRequest.setBody(request.getBody());
RemotingCommand consumerResponse = return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
return consumerResponse;
} catch (RemotingTimeoutException e) { } catch (RemotingTimeoutException e) {
response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
response response
......
...@@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ...@@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
} }
} }
} catch (RemotingCommandException e1) { } catch (RemotingCommandException e1) {
LOG.error("excuteRequestWhenWakeup run", e1); LOG.error("executeRequestWhenWakeup run", e1);
} }
} }
}; };
......
...@@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { ...@@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
queryMessageResult.release(); queryMessageResult.release();
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("transfer query message by pagecache failed, ", future.cause()); log.error("transfer query message by page cache failed, ", future.cause());
} }
} }
}); });
...@@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { ...@@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release(); selectMappedBufferResult.release();
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("transfer one message by pagecache failed, ", future.cause()); log.error("transfer one message by page cache failed, ", future.cause());
} }
} }
}); });
......
...@@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
try { try {
hook.consumeMessageAfter(context); hook.consumeMessageAfter(context);
} catch (Throwable e) { } catch (Throwable e) {
// Ignore
} }
} }
} }
......
...@@ -21,26 +21,26 @@ import java.util.List; ...@@ -21,26 +21,26 @@ import java.util.List;
public interface TransactionStore { public interface TransactionStore {
public boolean open(); boolean open();
public void close(); void close();
public boolean put(final List<TransactionRecord> trs); boolean put(final List<TransactionRecord> trs);
public void remove(final List<Long> pks); void remove(final List<Long> pks);
public List<TransactionRecord> traverse(final long pk, final int nums); List<TransactionRecord> traverse(final long pk, final int nums);
public long totalRecords(); long totalRecords();
public long minPK(); long minPK();
public long maxPK(); long maxPK();
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册