diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 53968fa59e8cb1fa6d3a98d099da2be1c4289136..c8624c4f55b284994138ddbea85eb024473d0c62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -210,7 +210,7 @@ public class BrokerController { this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; - e.printStackTrace(); + log.error("Failed to initialize", e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 10945da97c986c01e61507e501198b3a0583720d..fb7ea2030a979726202a96a910162600d2b1ddca 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -67,7 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PullMessageProcessor implements NettyRequestProcessor { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private List consumeMessageHookList; @@ -94,9 +94,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setOpaque(request.getOpaque()); - if (LOG.isDebugEnabled()) { - LOG.debug("receive PullMessage request command, {}", request); - } + log.debug("receive PullMessage request command, {}", request); if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); @@ -126,7 +124,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { - LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); + log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; @@ -141,7 +139,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); - LOG.warn(errorInfo); + log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; @@ -162,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { assert consumerFilterData != null; } } catch (Exception e) { - LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); @@ -172,7 +170,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { - LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); + log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; @@ -187,14 +185,14 @@ public class PullMessageProcessor implements NettyRequestProcessor { subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { - LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); + log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { - LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), + log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest"); @@ -209,7 +207,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { return response; } if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) { - LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", + log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST); response.setRemark("the consumer's consumer filter data not latest"); @@ -287,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // - requestHeader.getQueueOffset(), // - getMessageResult.getNextBeginOffset(), // - requestHeader.getTopic(), // - requestHeader.getQueueId(), // - requestHeader.getConsumerGroup()// + log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // + requestHeader.getQueueOffset(), // + getMessageResult.getNextBeginOffset(), // + requestHeader.getTopic(), // + requestHeader.getQueueId(), // + requestHeader.getConsumerGroup()// ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); @@ -307,16 +305,17 @@ public class PullMessageProcessor implements NettyRequestProcessor { case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); + log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}", + requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); - LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), - getMessageResult.getMinOffset(), channel.remoteAddress()); + log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), + getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: assert false; @@ -391,12 +390,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { - LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause()); + log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause()); } } }); } catch (Throwable e) { - LOG.error("Error occurred when transferring messages from page cache", e); + log.error("transfer many message by pagecache exception", e); getMessageResult.release(); } @@ -437,16 +436,16 @@ public class PullMessageProcessor implements NettyRequestProcessor { event.setOffsetRequest(requestHeader.getQueueOffset()); event.setOffsetNew(getMessageResult.getNextBeginOffset()); this.generateOffsetMovedEvent(event); - LOG.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); + log.warn( + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); + log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); } break; @@ -525,7 +524,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } catch (Exception e) { - LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e); + log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); } } @@ -544,20 +543,21 @@ public class PullMessageProcessor implements NettyRequestProcessor { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); - LOG.error(request.toString()); - LOG.error(response.toString()); + log.error("processRequestWrapper response to {} failed", + future.channel().remoteAddress(), future.cause()); + log.error(request.toString()); + log.error(response.toString()); } } }); } catch (Throwable e) { - LOG.error("ProcessRequestWrapper process request over, but response failed", e); - LOG.error(request.toString()); - LOG.error(response.toString()); + log.error("processRequestWrapper process request over, but response failed", e); + log.error(request.toString()); + log.error(response.toString()); } } } catch (RemotingCommandException e1) { - LOG.error("ExecuteRequestWhenWakeup run", e1); + log.error("excuteRequestWhenWakeup run", e1); } } }; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 3bcafc0ed00cb4a7d4fb7fa61eb9af9be421b08b..0d10a16239e5e8e5ca79ad266619701537f14652 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TopicConfigManager extends ConfigManager { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); @@ -181,15 +181,17 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } else { - LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]", - defaultTopic, defaultTopicConfig.getPerm(), remoteAddress); + log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]", + defaultTopic, defaultTopicConfig.getPerm(), remoteAddress); } } else { - LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress); + log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", + defaultTopic, remoteAddress); } if (topicConfig != null) { - LOG.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress); + log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", + defaultTopic, topicConfig, remoteAddress); this.topicConfigTable.put(topic, topicConfig); @@ -204,7 +206,7 @@ public class TopicConfigManager extends ConfigManager { } } } catch (InterruptedException e) { - LOG.error("createTopicInSendMessageMethod exception", e); + log.error("createTopicInSendMessageMethod exception", e); } if (createNew) { @@ -238,7 +240,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(topicSysFlag); - LOG.info("create new topic {}", topicConfig); + log.info("create new topic {}", topicConfig); this.topicConfigTable.put(topic, topicConfig); createNew = true; this.dataVersion.nextVersion(); @@ -248,7 +250,7 @@ public class TopicConfigManager extends ConfigManager { } } } catch (InterruptedException e) { - LOG.error("createTopicInSendMessageBackMethod exception", e); + log.error("createTopicInSendMessageBackMethod exception", e); } if (createNew) { @@ -269,8 +271,8 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag)); } - LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -289,8 +291,8 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag)); } - LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -304,9 +306,9 @@ public class TopicConfigManager extends ConfigManager { public void updateTopicConfig(final TopicConfig topicConfig) { TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); if (old != null) { - LOG.info("update topic config, old:[{}] new:[{}]", old, topicConfig); + log.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { - LOG.info("create new topic [{}]", topicConfig); + log.info("create new topic [{}]", topicConfig); } this.dataVersion.nextVersion(); @@ -324,7 +326,7 @@ public class TopicConfigManager extends ConfigManager { if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; - LOG.info("update order topic config, topic={}, order={}", topic, true); + log.info("update order topic config, topic={}, order={}", topic, true); } } @@ -335,7 +337,7 @@ public class TopicConfigManager extends ConfigManager { if (topicConfig.isOrder()) { topicConfig.setOrder(false); isChange = true; - LOG.info("update order topic config, topic={}, order={}", topic, false); + log.info("update order topic config, topic={}, order={}", topic, false); } } } @@ -359,11 +361,11 @@ public class TopicConfigManager extends ConfigManager { public void deleteTopicConfig(final String topic) { TopicConfig old = this.topicConfigTable.remove(topic); if (old != null) { - LOG.info("Delete topic config OK, topic:{}", old); + log.info("delete topic config OK, topic: {}", old); this.dataVersion.nextVersion(); this.persist(); } else { - LOG.warn("Delete topic config failed, topic:{} not exist", topic); + log.warn("delete topic config failed, topic: {} not exists", topic); } } @@ -409,7 +411,7 @@ public class TopicConfigManager extends ConfigManager { Iterator> it = tcs.getTopicConfigTable().entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); - LOG.info("load exist local topic, {}", next.getValue().toString()); + log.info("load exist local topic, {}", next.getValue().toString()); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 5bce013da80cacb78a3065250584153a33f744c7..f22729c14c9a8f879e1de552a4a9f9e5f9a47745 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -16,13 +16,19 @@ */ package org.apache.rocketmq.common; -import java.net.InetAddress; -import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; public class BrokerConfig { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); @ImportantField private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); @@ -121,16 +127,6 @@ public class BrokerConfig { private boolean filterSupportRetry = false; private boolean enablePropertyFilter = false; - public static String localHostName() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - - return "DEFAULT_BROKER"; - } - public boolean isTraceOn() { return traceOn; } @@ -179,6 +175,16 @@ public class BrokerConfig { this.slaveReadEnable = slaveReadEnable; } + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + log.error("Failed to obtain the host name", e); + } + + return "DEFAULT_BROKER"; + } + public int getRegisterBrokerTimeoutMills() { return registerBrokerTimeoutMills; } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index f826a2eb939545834b3c52742606d918aa061fdd..c33ebdf324e61b6a41234f55694fd58318046395 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class ConfigManager { - private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public abstract String encode(); @@ -36,11 +36,11 @@ public abstract class ConfigManager { return this.loadBak(); } else { this.decode(jsonString); - PLOG.info("load {} OK", fileName); + log.info("load {} OK", fileName); return true; } } catch (Exception e) { - PLOG.error("load " + fileName + " Failed, and try to load backup file", e); + log.error("load [{}] failed, and try to load backup file", fileName, e); return this.loadBak(); } } @@ -54,11 +54,11 @@ public abstract class ConfigManager { String jsonString = MixAll.file2String(fileName + ".bak"); if (jsonString != null && jsonString.length() > 0) { this.decode(jsonString); - PLOG.info("load " + fileName + " OK"); + log.info("load [{}] OK", fileName); return true; } } catch (Exception e) { - PLOG.error("load " + fileName + " Failed", e); + log.error("load [{}] Failed", fileName, e); return false; } @@ -74,7 +74,7 @@ public abstract class ConfigManager { try { MixAll.string2File(jsonString, fileName); } catch (IOException e) { - PLOG.error("persist file Exception, " + fileName, e); + log.error("persist file [{}] exception", fileName, e); } } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 36d81d09bc1c0ca9b377e593f512113f28528aab..f8e9b4e1f673c37b0b649712f37ce7fca3824cc4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -22,7 +22,6 @@ import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -43,10 +42,14 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MixAll { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"; public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; @@ -243,11 +246,11 @@ public class MixAll { return url.getPath(); } - public static void printObjectProperties(final Logger log, final Object object) { - printObjectProperties(log, object, false); + public static void printObjectProperties(final Logger logger, final Object object) { + printObjectProperties(logger, object, false); } - public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) { + public static void printObjectProperties(final Logger logger, final Object object, final boolean onlyImportantField) { Field[] fields = object.getClass().getDeclaredFields(); for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { @@ -261,7 +264,7 @@ public class MixAll { value = ""; } } catch (IllegalAccessException e) { - e.printStackTrace(); + log.error("Failed to obtain object properties", e); } if (onlyImportantField) { @@ -271,8 +274,9 @@ public class MixAll { } } - if (log != null) { - log.info(name + "=" + value); + if (logger != null) { + logger.info(name + "=" + value); + } else { } } } @@ -294,11 +298,8 @@ public class MixAll { try { InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET)); properties.load(in); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - return null; - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception e) { + log.error("Failed to handle properties", e); return null; } @@ -318,7 +319,7 @@ public class MixAll { field.setAccessible(true); value = field.get(object); } catch (IllegalAccessException e) { - e.printStackTrace(); + log.error("Failed to handle properties", e); } if (value != null) { diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java index 6a30e4e052799c81d78a1f2fb74eaaa51ea793bc..7b968807ccad397e9ace2c25663574c6ac9d3931 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java @@ -23,7 +23,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class ServiceThread implements Runnable { - private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final long JOIN_TIME = 90 * 1000; protected final Thread thread; @@ -47,7 +48,7 @@ public abstract class ServiceThread implements Runnable { public void shutdown(final boolean interrupt) { this.stopped = true; - STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify @@ -63,10 +64,10 @@ public abstract class ServiceThread implements Runnable { this.thread.join(this.getJointime()); } long eclipseTime = System.currentTimeMillis() - beginTime; - STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " - + this.getJointime()); + log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + + this.getJointime()); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } } @@ -80,7 +81,7 @@ public abstract class ServiceThread implements Runnable { public void stop(final boolean interrupt) { this.stopped = true; - STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify @@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable { public void makeStop() { this.stopped = true; - STLOG.info("makestop thread " + this.getServiceName()); + log.info("makestop thread " + this.getServiceName()); } public void wakeup() { @@ -114,7 +115,7 @@ public abstract class ServiceThread implements Runnable { try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index e9d926fedf95cc8e0edde3c5803d2aefd31903a7..15d41087c0a691f8b713213cc5b3b669518c4123 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -36,9 +36,16 @@ import java.util.Map; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; + +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class UtilAll { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS"; public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss"; @@ -269,15 +276,18 @@ public class UtilAll { } finally { try { byteArrayInputStream.close(); - } catch (IOException ignored) { + } catch (IOException e) { + log.error("Failed to close the stream", e); } try { inflaterInputStream.close(); - } catch (IOException ignored) { + } catch (IOException e) { + log.error("Failed to close the stream", e); } try { byteArrayOutputStream.close(); - } catch (IOException ignored) { + } catch (IOException e) { + log.error("Failed to close the stream", e); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java index 63f0fe063969aa61248ee28557027c4e8d518a15..6f740f7fd2a9fa5615a15f8e96acc883bdc7eaff 100644 --- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; public class NamesrvConfig { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); - private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java index 990e7486d7731d4e3394b590547ad3ad78c58540..57af0e709bd3ab44f3567d41b7725113e54b34a2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; public class TopAddressing { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private String nsAddr; private String wsAddr; private String unitName; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java index bff733313f06fabfff12a9cbebb7e1854a9a8b5c..c1cd69cd6ee7ec297e96d9d0af393039c7dc6833 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java @@ -17,11 +17,16 @@ package org.apache.rocketmq.common.protocol; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MQProtosHelper { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr, final long timeoutMillis) { RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); @@ -36,7 +41,7 @@ public class MQProtosHelper { return ResponseCode.SUCCESS == response.getCode(); } } catch (Exception e) { - e.printStackTrace(); + log.error("Failed to register broker", e); } return false; diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java index e0a94d70afeb8c81ac867eeee65e33126e105e4b..92bbf8dad47182b297bccbd08eb1a6caf417210b 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java @@ -44,7 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DynaCode { - private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); private static final String FILE_SP = System.getProperty("file.separator"); @@ -231,7 +231,7 @@ public class DynaCode { loadClass.put(getFullClassName(code), null); } if (null != srcFile) { - LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath()); + log.warn("Dyna Create Java Source File:----> {}", srcFile.getAbsolutePath()); srcFileAbsolutePaths.add(srcFile.getAbsolutePath()); srcFile.deleteOnExit(); } @@ -277,9 +277,9 @@ public class DynaCode { Class classz = classLoader.loadClass(key); if (null != classz) { loadClass.put(key, classz); - LOGGER.info("Dyna Load Java Class File OK:----> className: " + key); + log.info("Dyna Load Java Class File OK:----> className: {}", key); } else { - LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key); + log.error("Dyna Load Java Class File Fail:----> className: {}", key); } } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java index 2fdd4fb6de5c6c0cb67d85596b819332b1d1d45b..04cf870278978d09d34566a7f4fb1c4f18a52068 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -45,7 +45,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor { try { adminExt.start(); } catch (MQClientException e) { - e.printStackTrace(); + log.error("Failed to start processor", e); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index 8c4fab7922319f07d5feb8be48eca0401a227f3d..40c594313a89571e797b3aebde6d987c35d497c8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -26,11 +26,15 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemotingHelper { public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; public static final String DEFAULT_CHARSET = "UTF-8"; + private static final Logger log = LoggerFactory.getLogger(ROCKETMQ_REMOTING); + public static String exceptionSimpleDesc(final Throwable e) { StringBuffer sb = new StringBuffer(); if (e != null) { @@ -126,7 +130,7 @@ public class RemotingHelper { byteBufferBody.flip(); return RemotingCommand.decode(byteBufferBody); } catch (IOException e) { - e.printStackTrace(); + log.error("invokeSync failure", e); if (sendRequestOK) { throw new RemotingTimeoutException(addr, timeoutMillis); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index f64f9e153ef1398c3eff760b9106817fd3e61f23..8d24e76b4f886cfc784a2f54832286d57cf05a53 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -26,8 +26,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; -import java.net.SocketException; -import java.net.UnknownHostException; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; @@ -130,10 +128,8 @@ public class RemotingUtil { //If failed to find,fall back to localhost final InetAddress localHost = InetAddress.getLocalHost(); return normalizeHostAddress(localHost); - } catch (SocketException e) { - e.printStackTrace(); - } catch (UnknownHostException e) { - e.printStackTrace(); + } catch (Exception e) { + log.error("Failed to obtain local address", e); } return null; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java index 7c7e89ba26f3e4309f7e3820e79552aade116525..843618900dd4e452d6809909b2f485de867f716c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java @@ -23,7 +23,8 @@ import org.slf4j.LoggerFactory; * Base class for background thread */ public abstract class ServiceThread implements Runnable { - private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final long JOIN_TIME = 90 * 1000; protected final Thread thread; protected volatile boolean hasNotified = false; @@ -45,7 +46,7 @@ public abstract class ServiceThread implements Runnable { public void shutdown(final boolean interrupt) { this.stopped = true; - STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; @@ -61,10 +62,10 @@ public abstract class ServiceThread implements Runnable { long beginTime = System.currentTimeMillis(); this.thread.join(this.getJointime()); long eclipseTime = System.currentTimeMillis() - beginTime; - STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " - + this.getJointime()); + log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + + this.getJointime()); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } } @@ -78,7 +79,7 @@ public abstract class ServiceThread implements Runnable { public void stop(final boolean interrupt) { this.stopped = true; - STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; @@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable { public void makeStop() { this.stopped = true; - STLOG.info("makestop thread " + this.getServiceName()); + log.info("makestop thread " + this.getServiceName()); } public void wakeup() { @@ -116,7 +117,7 @@ public abstract class ServiceThread implements Runnable { try { this.wait(interval); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } finally { this.hasNotified = false; this.onWaitEnd(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 33cef332481bc48bc9ad3fe24f257746e9b9f55a..4ed156d5450054676e034a969fc03ebbeee20836 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final int FRAME_MAX_LENGTH = // Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 0ba714a79801aea25c01a838579ed26c76a4e843..ba74b53260642af9b8b0633ae87369048ba3b687 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -53,7 +53,7 @@ public abstract class NettyRemotingAbstract { /** * Remoting logger instance. */ - private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); /** * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint. @@ -175,17 +175,17 @@ public abstract class NettyRemotingAbstract { try { ctx.writeAndFlush(response); } catch (Throwable e) { - PLOG.error("process request over, but response failed", e); - PLOG.error(cmd.toString()); - PLOG.error(response.toString()); + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(response.toString()); } } else { } } } catch (Throwable e) { - PLOG.error("process request exception", e); - PLOG.error(cmd.toString()); + log.error("process request exception", e); + log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // @@ -210,7 +210,7 @@ public abstract class NettyRemotingAbstract { pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { - PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // + log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // + ", too many requests and system thread pool busy, RejectedExecutionException " // + pair.getObject2().toString() // + " request code: " + cmd.getCode()); @@ -229,7 +229,7 @@ public abstract class NettyRemotingAbstract { RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); - PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); + log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } @@ -254,8 +254,8 @@ public abstract class NettyRemotingAbstract { responseFuture.putResponse(cmd); } } else { - PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - PLOG.warn(cmd.toString()); + log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + log.warn(cmd.toString()); } } @@ -274,13 +274,13 @@ public abstract class NettyRemotingAbstract { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { - PLOG.warn("execute callback in executor exception, and callback throw", e); + log.warn("execute callback in executor exception, and callback throw", e); } } }); } catch (Exception e) { runInThisThread = true; - PLOG.warn("execute callback in executor exception, maybe executor busy", e); + log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; @@ -290,7 +290,7 @@ public abstract class NettyRemotingAbstract { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { - PLOG.warn("executeInvokeCallback Exception", e); + log.warn("executeInvokeCallback Exception", e); } } } @@ -324,7 +324,7 @@ public abstract class NettyRemotingAbstract { rep.release(); it.remove(); rfList.add(rep); - PLOG.warn("remove timeout request, " + rep); + log.warn("remove timeout request, " + rep); } } @@ -332,7 +332,7 @@ public abstract class NettyRemotingAbstract { try { executeInvokeCallback(rf); } catch (Throwable e) { - PLOG.warn("scanResponseTable, operationComplete Exception", e); + log.warn("scanResponseTable, operationComplete Exception", e); } } } @@ -358,7 +358,7 @@ public abstract class NettyRemotingAbstract { responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); - PLOG.warn("send a request command to channel <" + addr + "> failed."); + log.warn("send a request command to channel <" + addr + "> failed."); } }); @@ -404,17 +404,17 @@ public abstract class NettyRemotingAbstract { try { executeInvokeCallback(responseFuture); } catch (Throwable e) { - PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); + log.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } - PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); + log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); - PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); + log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { @@ -427,7 +427,7 @@ public abstract class NettyRemotingAbstract { this.semaphoreAsync.getQueueLength(), // this.semaphoreAsync.availablePermits()// ); - PLOG.warn(info); + log.warn(info); throw new RemotingTimeoutException(info); } } @@ -445,13 +445,13 @@ public abstract class NettyRemotingAbstract { public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { - PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); + log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); - PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); + log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { @@ -464,7 +464,7 @@ public abstract class NettyRemotingAbstract { this.semaphoreOneway.getQueueLength(), // this.semaphoreOneway.availablePermits()// ); - PLOG.warn(info); + log.warn(info); throw new RemotingTimeoutException(info); } } @@ -478,13 +478,13 @@ public abstract class NettyRemotingAbstract { if (this.eventQueue.size() <= maxSize) { this.eventQueue.add(event); } else { - PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); + log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); } } @Override public void run() { - PLOG.info(this.getServiceName() + " service started"); + log.info(this.getServiceName() + " service started"); final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); @@ -511,11 +511,11 @@ public abstract class NettyRemotingAbstract { } } } catch (Exception e) { - PLOG.warn(this.getServiceName() + " service has exception. ", e); + log.warn(this.getServiceName() + " service has exception. ", e); } } - PLOG.info(this.getServiceName() + " service end"); + log.info(this.getServiceName() + " service end"); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index abb8385642e23c93119fe2fe45b54ea5ef21debd..ad8e65dca40ed8540325b8895e2805a3ee917e06 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -127,7 +127,7 @@ public class AllocateMappedFileService extends ServiceThread { try { this.thread.join(this.getJointime()); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } for (AllocateRequest req : this.requestTable.values()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b44211c5712cf5e0c2828212d0db5a267b221e9e..0810d0ca3d6f85f86d757577178ca19fa9ce9e3e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -903,35 +903,6 @@ public class CommitLog { - public static class GroupCommitRequest { - private final long nextOffset; - private final CountDownLatch countDownLatch = new CountDownLatch(1); - private volatile boolean flushOK = false; - - public GroupCommitRequest(long nextOffset) { - this.nextOffset = nextOffset; - } - - public long getNextOffset() { - return nextOffset; - } - - public void wakeupCustomer(final boolean flushOK) { - this.flushOK = flushOK; - this.countDownLatch.countDown(); - } - - public boolean waitForFlush(long timeout) { - try { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.flushOK; - } catch (InterruptedException e) { - e.printStackTrace(); - return false; - } - } - } - abstract class FlushCommitLogService extends ServiceThread { protected static final int RETRY_TIMES_OVER = 10; } @@ -1070,6 +1041,39 @@ public class CommitLog { } } + public static class GroupCommitRequest { + private final long nextOffset; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private volatile boolean flushOK = false; + + + public GroupCommitRequest(long nextOffset) { + this.nextOffset = nextOffset; + } + + + public long getNextOffset() { + return nextOffset; + } + + + public void wakeupCustomer(final boolean flushOK) { + this.flushOK = flushOK; + this.countDownLatch.countDown(); + } + + + public boolean waitForFlush(long timeout) { + try { + this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + return this.flushOK; + } catch (InterruptedException e) { + log.error("Interrupted", e); + return false; + } + } + } + /** * GroupCommit Service */ diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d03ff0f32ebe790855665b244024b394e125eee3..275334c05f3e45df9e628d21e08bf9865242cf2e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -25,9 +25,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConsumeQueue { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); public static final int CQ_STORE_UNIT_SIZE = 20; - private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); private final DefaultMessageStore defaultMessageStore; diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index a9a00a857c0e575288a06dcb47217c773c16a6c8..42504509aeebbac0d07b5fd0e744a20b2adad1fe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -512,7 +512,7 @@ public class MappedFile extends ReferenceResource { try { Thread.sleep(0); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java index 5484dce10fd2873f5f0e7eddccd50999d5466281..c5981c6d476cbd71d5d1357c8da3f9eb4797dec8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java @@ -71,7 +71,7 @@ public class StoreCheckpoint { try { this.fileChannel.close(); } catch (IOException e) { - e.printStackTrace(); + log.error("Failed to properly close the channel", e); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java index fb283d609e2c7d056ba284cbf9fe256d6f88c253..6aba37529a4bc22eddd4c11762877df3b4115615 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java @@ -16,9 +16,14 @@ */ package org.apache.rocketmq.store.ha; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; public class WaitNotifyObject { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final HashMap waitingThreadTable = new HashMap(16); @@ -45,7 +50,7 @@ public class WaitNotifyObject { try { this.wait(interval); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } finally { this.hasNotified = false; this.onWaitEnd(); @@ -84,7 +89,7 @@ public class WaitNotifyObject { try { this.wait(interval); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } finally { this.waitingThreadTable.put(currentThreadId, false); this.onWaitEnd(); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index fb75cf9f2ae1f37e9283f379f8606ff401ae28dc..54f573270872084f189f6e6e9c55725622b6e3bc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -147,7 +147,7 @@ public class IndexFile { try { fileLock.release(); } catch (IOException e) { - e.printStackTrace(); + log.error("Failed to release the lock", e); } } } @@ -254,7 +254,7 @@ public class IndexFile { try { fileLock.release(); } catch (IOException e) { - e.printStackTrace(); + log.error("Failed to release the lock", e); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index 1ebf52a6dc48029ba0cd1ae676f14c3494648fea..c434df58e797085072cacbd22748f98e704b2b49 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -275,7 +275,7 @@ public class IndexService { log.info("Tried to create index file " + times + " times"); Thread.sleep(1000); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("Interrupted", e); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 012a4f0550a980ed38d448ba968749c5d2600888..25640a467e4442d46f9999de75a026311a8ff497 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -44,8 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ScheduleMessageService extends ConfigManager { - public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index 0d2747df02fc71129d84d31611916ebb11f29e85..5555b8b6ba236e3a79803c2c9c1b06c49f52ae46 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; public class BrokerStats { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final DefaultMessageStore defaultMessageStore; private volatile long msgPutTotalYesterdayMorning;