提交 46f91479 编写于 作者: L lindzh 提交者: yukon

[ROCKETMQ-359] Replace slf4j api used in RocketMQ with InternalLogger (#221)

上级 bf3f87c5
......@@ -67,6 +67,8 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
......@@ -88,13 +90,11 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
private final BrokerConfig brokerConfig;
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
......
......@@ -31,6 +31,8 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
......@@ -41,7 +43,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
......@@ -50,7 +51,7 @@ public class BrokerStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
public static Logger log;
public static InternalLogger log;
public static void main(String[] args) {
start(createBrokerController(args));
......@@ -177,14 +178,14 @@ public class BrokerStartup {
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
if (commandLine.hasOption('p')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
......@@ -192,7 +193,7 @@ public class BrokerStartup {
System.exit(0);
}
log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
......
......@@ -23,12 +23,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private ScheduledExecutorService scheduledExecutorService = Executors
......
......@@ -26,14 +26,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerGroupInfo {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
......
......@@ -25,16 +25,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
......
......@@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final Lock groupChannelLock = new ReentrantLock();
......
......@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -50,11 +52,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Broker2Client {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
public Broker2Client(BrokerController brokerController) {
......
......@@ -23,12 +23,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RebalanceLockManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock();
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.broker.filter;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.DispatchRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
......@@ -33,7 +33,7 @@ import java.util.Iterator;
*/
public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
protected final BrokerConfig brokerConfig;
protected final ConsumerFilterManager consumerFilterManager;
......
......@@ -22,14 +22,14 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.filter.util.BloomFilter;
import org.apache.rocketmq.filter.util.BloomFilterData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
......@@ -42,7 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class ConsumerFilterManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
private static final long MS_24_HOUR = 24 * 3600 * 1000;
......
......@@ -19,21 +19,21 @@ package org.apache.rocketmq.broker.filter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.filter.util.BloomFilter;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Map;
public class ExpressionMessageFilter implements MessageFilter {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
protected final SubscriptionData subscriptionData;
protected final ConsumerFilterData consumerFilterData;
......
......@@ -31,14 +31,14 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilterServerManager {
public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
new ConcurrentHashMap<Channel, FilterServerInfo>(16);
private final BrokerController brokerController;
......
......@@ -17,10 +17,11 @@
package org.apache.rocketmq.broker.filtersrv;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class FilterServerUtil {
public static void callShell(final String shellString, final Logger log) {
public static void callShell(final String shellString, final InternalLogger log) {
Process process = null;
try {
String[] cmdArray = splitShellString(shellString);
......
......@@ -23,17 +23,17 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
* {@link BrokerController#pullThreadPoolQueue}
*/
public class BrokerFastFailure {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerFastFailureScheduledThread"));
private final BrokerController brokerController;
......
......@@ -25,12 +25,12 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PullRequestHoldService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
private final BrokerController brokerController;
private final SystemClock systemClock = new SystemClock();
......
......@@ -29,12 +29,12 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerOffsetManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
......
......@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -44,11 +46,9 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerOuterAPI {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
......
......@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -43,8 +45,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
......@@ -53,7 +53,7 @@ import java.util.Map;
import java.util.Random;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final static int DLQ_NUMS_PER_GROUP = 1;
protected final BrokerController brokerController;
......
......@@ -44,6 +44,8 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -113,11 +115,9 @@ import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
public AdminBrokerProcessor(final BrokerController brokerController) {
......
......@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
......@@ -35,15 +36,14 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientManageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
public ClientManageProcessor(final BrokerController brokerController) {
......
......@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
......@@ -30,15 +31,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerManageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......
......@@ -20,6 +20,8 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -34,11 +36,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EndTransactionProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final BrokerController brokerController;
public EndTransactionProcessor(final BrokerController brokerController) {
......
......@@ -19,13 +19,13 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ForwardRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......
......@@ -41,6 +41,8 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -63,11 +65,9 @@ import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PullMessageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
......
......@@ -25,6 +25,8 @@ import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
......@@ -35,11 +37,9 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueryMessageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......
......@@ -21,15 +21,15 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SlaveSynchronize {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private volatile String masterAddr = null;
......
......@@ -26,13 +26,13 @@ import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscriptionGroupManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
......
......@@ -34,14 +34,14 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicConfigManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock();
......
......@@ -31,11 +31,11 @@ import org.apache.rocketmq.broker.transaction.TransactionRecord;
import org.apache.rocketmq.broker.transaction.TransactionStore;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class JDBCTransactionStore implements TransactionStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
private Connection connection;
private AtomicLong totalRecordsValue = new AtomicLong(0);
......
......@@ -46,6 +46,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
......
......@@ -20,9 +20,9 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
public class MQHelper {
public static void resetOffsetByTimestamp(
......@@ -48,7 +48,7 @@ public class MQHelper {
final String consumerGroup,
final String topic,
final long timestamp) throws Exception {
final Logger log = ClientLogger.getLog();
final InternalLogger log = ClientLogger.getLog();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
consumer.setInstanceName(instanceName);
......
......@@ -26,15 +26,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
/**
* Schedule service for pull consumer
*/
public class MQPullConsumerScheduleService {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
......
......@@ -20,14 +20,14 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
......
......@@ -20,14 +20,14 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
/**
* Cycle average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
......
......@@ -24,14 +24,14 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
......
......@@ -31,9 +31,9 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
/**
* Local storage implementation
......@@ -42,7 +42,7 @@ public class LocalFileOffsetStore implements OffsetStore {
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
......
......@@ -30,17 +30,17 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
/**
* Remote storage implementation
*/
public class RemoteBrokerOffsetStore implements OffsetStore {
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -44,10 +45,10 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
......
......@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -52,11 +53,10 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
public class MQAdminImpl {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private long timeoutMillis = 6000;
......
......@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
......@@ -150,11 +151,10 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
public static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
......
......@@ -22,11 +22,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
public class MQClientManager {
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -46,10 +47,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;
......
......@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -46,10 +47,9 @@ import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
......
......@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -61,10 +62,9 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
private final long consumerStartTimestamp = System.currentTimeMillis();
private final RPCHook rpcHook;
......
......@@ -56,6 +56,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -74,7 +75,6 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
......@@ -91,7 +91,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
......
......@@ -28,11 +28,11 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.slf4j.Logger;
/**
* Queue consumption snapshot
......@@ -42,7 +42,7 @@ public class ProcessQueue {
Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
......
......@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -47,10 +48,9 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
public class PullAPIWrapper {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String consumerGroup;
private final boolean unitMode;
......
......@@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.slf4j.Logger;
public class PullMessageService extends ServiceThread {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
......
......@@ -32,19 +32,19 @@ import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.slf4j.Logger;
/**
* Base class for rebalance algorithm
*/
public abstract class RebalanceImpl {
protected static final Logger log = ClientLogger.getLog();
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
......
......@@ -19,13 +19,13 @@ package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
......
......@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
......@@ -80,11 +81,10 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
......
......@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -79,10 +81,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.slf4j.Logger;
public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final InternalLogger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.latency;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
public class MQFaultStrategy {
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
......
......@@ -17,104 +17,82 @@
package org.apache.rocketmq.client.log;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.net.URL;
import org.apache.rocketmq.logging.InnerLoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.logging.inner.Appender;
import org.apache.rocketmq.logging.inner.Layout;
import org.apache.rocketmq.logging.inner.Level;
import org.apache.rocketmq.logging.inner.Logger;
import org.apache.rocketmq.logging.inner.LoggingBuilder;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ClientLogger {
public static final String CLIENT_LOG_USESLF4J = "rocketmq.client.logUseSlf4j";
public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
public static final String CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize";
public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
public static final String CLIENT_LOG_FILENAME = "rocketmq.client.logFileName";
public static final String CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize";
public static final String ROCKETMQ_CLIENT_APPENDER_NAME = "RocketmqClientAppender";
private static Logger log;
private static Logger createLogger(final String loggerName) {
String logConfigFilePath = System.getProperty("rocketmq.client.log.configFile", System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
Boolean isloadconfig =
Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
private static final InternalLogger CLIENT_LOGGER;
final String log4JResourceFile =
System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
private static final boolean CLIENT_USE_SLF4J;
final String logbackResourceFile =
System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
private static Appender rocketmqClientAppender = null;
final String log4J2ResourceFile =
System.getProperty("rocketmq.client.log4j2.resource.fileName", "log4j2_rocketmq_client.xml");
static {
CLIENT_USE_SLF4J = Boolean.parseBoolean(System.getProperty(CLIENT_LOG_USESLF4J, "false"));
if (!CLIENT_USE_SLF4J) {
InternalLoggerFactory.setCurrentLoggerType(InnerLoggerFactory.LOGGER_INNER);
CLIENT_LOGGER = createLogger(LoggerName.CLIENT_LOGGER_NAME);
createLogger(LoggerName.COMMON_LOGGER_NAME);
createLogger(RemotingHelper.ROCKETMQ_REMOTING);
} else {
CLIENT_LOGGER = InternalLoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
}
}
private static synchronized void createClientAppender() {
String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs/rocketmqlogs");
System.setProperty("client.logRoot", clientLogRoot);
String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
System.setProperty("client.logLevel", clientLogLevel);
String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10");
System.setProperty("client.logFileMaxIndex", clientLogMaxIndex);
if (isloadconfig) {
try {
ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
Class classType = iLoggerFactory.getClass();
if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) {
Class<?> domconfigurator;
Object domconfiguratorobj;
domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator");
domconfiguratorobj = domconfigurator.newInstance();
if (null == logConfigFilePath) {
Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class);
URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile);
configure.invoke(domconfiguratorobj, url);
} else {
Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class);
configure.invoke(domconfiguratorobj, logConfigFilePath);
}
} else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) {
Class<?> joranConfigurator;
Class<?> context = Class.forName("ch.qos.logback.core.Context");
Object joranConfiguratoroObj;
joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator");
joranConfiguratoroObj = joranConfigurator.newInstance();
Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context);
setContext.invoke(joranConfiguratoroObj, iLoggerFactory);
if (null == logConfigFilePath) {
URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
Method doConfigure =
joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
doConfigure.invoke(joranConfiguratoroObj, url);
} else {
Method doConfigure =
joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
}
} else if (classType.getName().equals("org.apache.logging.slf4j.Log4jLoggerFactory")) {
Class<?> joranConfigurator = Class.forName("org.apache.logging.log4j.core.config.Configurator");
Method initialize = joranConfigurator.getDeclaredMethod("initialize", String.class, String.class);
if (null == logConfigFilePath) {
initialize.invoke(joranConfigurator, "log4j2", log4J2ResourceFile);
} else {
initialize.invoke(joranConfigurator, "log4j2", logConfigFilePath);
}
}
} catch (Exception e) {
System.err.println(e);
}
}
return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
String clientLogFileName = System.getProperty(CLIENT_LOG_FILENAME, "rocketmq_client.log");
String maxFileSize = System.getProperty(CLIENT_LOG_FILESIZE, "1073741824");
String asyncQueueSize = System.getProperty(CLIENT_LOG_ASYNC_QUEUESIZE, "1024");
String logFileName = clientLogRoot + "/" + clientLogFileName;
int maxFileIndex = Integer.parseInt(clientLogMaxIndex);
int queueSize = Integer.parseInt(asyncQueueSize);
Layout layout = LoggingBuilder.newLayoutBuilder().withDefaultLayout().build();
rocketmqClientAppender = LoggingBuilder.newAppenderBuilder()
.withRollingFileAppender(logFileName, maxFileSize, maxFileIndex)
.withAsync(false, queueSize).withName(ROCKETMQ_CLIENT_APPENDER_NAME).withLayout(layout).build();
Logger.getRootLogger().addAppender(rocketmqClientAppender);
}
public static Logger getLog() {
if (log == null) {
log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
return log;
} else {
return log;
private static InternalLogger createLogger(final String loggerName) {
String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
InternalLogger logger = InternalLoggerFactory.getLogger(loggerName);
InnerLoggerFactory.InnerLogger innerLogger = (InnerLoggerFactory.InnerLogger) logger;
Logger realLogger = innerLogger.getLogger();
if (rocketmqClientAppender == null) {
createClientAppender();
}
realLogger.addAppender(rocketmqClientAppender);
realLogger.setLevel(Level.toLevel(clientLogLevel));
return logger;
}
public static void setLog(Logger log) {
ClientLogger.log = log;
public static InternalLogger getLog() {
return CLIENT_LOGGER;
}
}
......@@ -18,15 +18,15 @@
package org.apache.rocketmq.client.stat;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerStatsManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
private static final InternalLogger log = ClientLogger.getLog();
private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE xml>
<Configuration status="warn" name="RocketmqClient">
<Appenders>
<Console name="STDOUT-APPENDER">
<PatternLayout pattern="%-5p %c{2} , %m%n"/>
</Console>
<RollingFile name="RocketmqClientAppender" fileName="${sys:client.logRoot}/rocketmq_client.log"
filePattern="${sys:client.logRoot}/rocketmq_client-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %c{1}(%L) - %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="1 GB"/>
</Policies>
<DefaultRolloverStrategy max="${sys:client.logFileMaxIndex}"/>
</RollingFile>
</Appenders>
<Loggers>
<logger name="RocketmqClient" level="${sys:client.logLevel}" additivity="false">
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqCommon" level="${sys:client.logLevel}" additivity="false">
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqRemoting" level="${sys:client.logLevel}" additivity="false">
<appender-ref ref="RocketmqClientAppender"/>
</logger>
</Loggers>
</Configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="STDOUT-APPENDER" class="org.apache.log4j.ConsoleAppender">
<param name="encoding" value="UTF-8"/>
<param name="target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p %c{2} , %m%n"/>
</layout>
</appender>
<appender name="RocketmqClientAppender" class="org.apache.log4j.RollingFileAppender">
<param name="file" value="${client.logRoot}/rocketmq_client.log"/>
<param name="append" value="true"/>
<param name="encoding" value="UTF-8"/>
<param name="maxFileSize" value="1073741824"/>
<param name="maxBackupIndex" value="${client.logFileMaxIndex}"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %c{1}(%L) - %m%n"/>
</layout>
</appender>
<logger name="RocketmqClient" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqCommon" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqRemoting" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
</log4j:configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="RocketmqClientAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${client.logRoot}/rocketmq_client.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${client.logRoot}/otherdays/rocketmq_client.%i.log
</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>${client.logFileMaxIndex}</maxIndex>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<logger name="RocketmqCommon" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqRemoting" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
<logger name="RocketmqClient" additivity="false">
<level value="${client.logLevel}"/>
<appender-ref ref="RocketmqClientAppender"/>
</logger>
</configuration>
......@@ -58,6 +58,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
......@@ -79,6 +80,7 @@ public class DefaultMQPushConsumerTest {
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
......
......@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Before;
......
......@@ -17,14 +17,17 @@
package org.apache.rocketmq.client.log;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class ClientLoggerTest {
......@@ -35,18 +38,29 @@ public class ClientLoggerTest {
LOG_DIR = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs/rocketmqlogs");
}
@Test
public void testClientlog() throws IOException {
InternalLogger logger = ClientLogger.getLog();
InternalLogger rocketmqCommon = InternalLoggerFactory.getLogger("RocketmqCommon");
InternalLogger rocketmqRemoting = InternalLoggerFactory.getLogger("RocketmqRemoting");
for (int i = 0; i < 10; i++) {
logger.info("testClientlog test {}", i);
rocketmqCommon.info("common message {}", i, new RuntimeException());
rocketmqRemoting.info("remoting message {}", i, new RuntimeException());
}
String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
Assert.assertTrue(content.contains("testClientlog"));
Assert.assertTrue(content.contains("RocketmqClient"));
Assert.assertTrue(content.contains("RocketmqCommon"));
Assert.assertTrue(content.contains("RocketmqRemoting"));
}
@After
public void cleanFiles() {
UtilAll.deleteFile(new File(LOG_DIR));
}
// FIXME: Workaround for concrete implementation for slf4j, is there any better solution for all slf4j implementations in one class ? 2017/8/1
@Test
public void testLog4j2() throws Exception {
Logger logger = ClientLogger.getLog();
System.out.println(logger);
assertEquals("org.apache.logging.slf4j.Log4jLogger", logger.getClass().getName());
}
}
......@@ -19,15 +19,15 @@ package org.apache.rocketmq.common;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class BrokerConfig {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
......
......@@ -18,11 +18,11 @@ package org.apache.rocketmq.common;
import java.io.IOException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public abstract class ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public abstract String encode();
......@@ -36,11 +36,11 @@ public abstract class ConfigManager {
return this.loadBak();
} else {
this.decode(jsonString);
log.info("load {} OK", fileName);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load [{}] failed, and try to load backup file", fileName, e);
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
......@@ -54,11 +54,11 @@ public abstract class ConfigManager {
String jsonString = MixAll.file2String(fileName + ".bak");
if (jsonString != null && jsonString.length() > 0) {
this.decode(jsonString);
log.info("load [{}] OK", fileName);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load [{}] Failed", fileName, e);
log.error("load " + fileName + " Failed", e);
return false;
}
......@@ -74,7 +74,7 @@ public abstract class ConfigManager {
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file [{}] exception", fileName, e);
log.error("persist file " + fileName + " exception", e);
}
}
}
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.common;
import org.apache.rocketmq.logging.InternalLogger;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
......@@ -25,11 +27,10 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
public class Configuration {
private final Logger log;
private final InternalLogger log;
private List<Object> configObjectList = new ArrayList<Object>(4);
private String storePath;
......@@ -43,11 +44,11 @@ public class Configuration {
*/
private Properties allConfigs = new Properties();
public Configuration(Logger log) {
public Configuration(InternalLogger log) {
this.log = log;
}
public Configuration(Logger log, Object... configObjects) {
public Configuration(InternalLogger log, Object... configObjects) {
this.log = log;
if (configObjects == null || configObjects.length == 0) {
return;
......@@ -57,7 +58,7 @@ public class Configuration {
}
}
public Configuration(Logger log, String storePath, Object... configObjects) {
public Configuration(InternalLogger log, String storePath, Object... configObjects) {
this(log, configObjects);
this.storePath = storePath;
}
......
......@@ -41,11 +41,11 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class MixAll {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
......@@ -227,11 +227,11 @@ public class MixAll {
return null;
}
public static void printObjectProperties(final Logger logger, final Object object) {
public static void printObjectProperties(final InternalLogger logger, final Object object) {
printObjectProperties(logger, object, false);
}
public static void printObjectProperties(final Logger logger, final Object object,
public static void printObjectProperties(final InternalLogger logger, final Object object,
final boolean onlyImportantField) {
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.common;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public abstract class ServiceThread implements Runnable {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final long JOIN_TIME = 90 * 1000;
......
......@@ -38,13 +38,12 @@ import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UtilAll {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
......
......@@ -23,11 +23,11 @@ package org.apache.rocketmq.common.namesrv;
import java.io.File;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class NamesrvConfig {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
......
......@@ -25,12 +25,12 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.utils.HttpTinyClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopAddressing {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String nsAddr;
private String wsAddr;
......@@ -79,7 +79,7 @@ public class TopAddressing {
log.error("fetch nameserver address is null");
}
} else {
log.error("fetch nameserver address failed. statusCode={}", result.code);
log.error("fetch nameserver address failed. statusCode=" + result.code);
}
} catch (IOException e) {
if (verbose) {
......
......@@ -18,14 +18,14 @@
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQProtosHelper {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
final long timeoutMillis) {
......
......@@ -22,14 +22,14 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* thread safe
*/
public class ConcurrentTreeMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ReentrantLock lock;
private TreeMap<K, V> tree;
private RoundQueue<K> roundQueue;
......@@ -58,7 +58,7 @@ public class ConcurrentTreeMap<K, V> {
tree.put(key, value);
exsit = value;
}
log.warn("putIfAbsentAndRetExsit success. {}", key);
log.warn("putIfAbsentAndRetExsit success. " + key);
return exsit;
} else {
V exsit = tree.get(key);
......
......@@ -21,7 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class MomentStatsItem {
......@@ -30,10 +30,10 @@ public class MomentStatsItem {
private final String statsName;
private final String statsKey;
private final ScheduledExecutorService scheduledExecutorService;
private final Logger log;
private final InternalLogger log;
public MomentStatsItem(String statsName, String statsKey,
ScheduledExecutorService scheduledExecutorService, Logger log) {
ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
this.statsName = statsName;
this.statsKey = statsKey;
this.scheduledExecutorService = scheduledExecutorService;
......
......@@ -24,16 +24,16 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class MomentStatsItemSet {
private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable =
new ConcurrentHashMap<String, MomentStatsItem>(128);
private final String statsName;
private final ScheduledExecutorService scheduledExecutorService;
private final Logger log;
private final InternalLogger log;
public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
this.statsName = statsName;
this.scheduledExecutorService = scheduledExecutorService;
this.log = log;
......
......@@ -22,7 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class StatsItem {
......@@ -39,10 +39,10 @@ public class StatsItem {
private final String statsName;
private final String statsKey;
private final ScheduledExecutorService scheduledExecutorService;
private final Logger log;
private final InternalLogger log;
public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
Logger log) {
InternalLogger log) {
this.statsName = statsName;
this.statsKey = statsKey;
this.scheduledExecutorService = scheduledExecutorService;
......
......@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public class StatsItemSet {
private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
......@@ -32,9 +32,9 @@ public class StatsItemSet {
private final String statsName;
private final ScheduledExecutorService scheduledExecutorService;
private final Logger log;
private final InternalLogger log;
public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
this.statsName = statsName;
this.scheduledExecutorService = scheduledExecutorService;
this.log = log;
......
......@@ -26,11 +26,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public final class ThreadUtils {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
......
......@@ -33,11 +33,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.Logger;
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
......@@ -56,7 +56,7 @@ public class Producer {
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
final Logger log = ClientLogger.getLog();
final InternalLogger log = ClientLogger.getLog();
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
......
......@@ -25,6 +25,8 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.filtersrv.filter.FilterClassManager;
import org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor;
......@@ -32,11 +34,9 @@ import org.apache.rocketmq.filtersrv.stats.FilterServerStatsManager;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FiltersrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final FiltersrvConfig filtersrvConfig;
......
......@@ -30,16 +30,17 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FiltersrvStartup {
public static Logger log;
public static InternalLogger log;
public static void main(String[] args) {
start(createController(args));
......@@ -125,7 +126,7 @@ public class FiltersrvStartup {
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");
log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
final FiltersrvController controller =
new FiltersrvController(filtersrvConfig, nettyServerConfig);
......
......@@ -39,12 +39,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DynaCode {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final String FILE_SP = System.getProperty("file.separator");
......
......@@ -29,12 +29,12 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.filtersrv.FiltersrvController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilterClassManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final Object compileLock = new Object();
private final FiltersrvController filtersrvController;
......
......@@ -18,13 +18,13 @@
package org.apache.rocketmq.filtersrv.filter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.utils.HttpTinyClient;
import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final String url;
public HttpFilterClassFetchMethod(String url) {
......
......@@ -31,6 +31,8 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -47,11 +49,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.CommitLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final FiltersrvController filtersrvController;
......@@ -61,7 +61,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
if (log.isDebugEnabled()) {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
......
......@@ -21,12 +21,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilterServerStatsManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
......
......@@ -24,6 +24,10 @@ import java.util.Map;
public class InnerLoggerFactory extends InternalLoggerFactory {
public InnerLoggerFactory() {
doRegister();
}
@Override
protected InternalLogger getLoggerInstance(String name) {
return new InnerLogger(name);
......
......@@ -73,10 +73,6 @@ public abstract class InternalLoggerFactory {
}
}
public InternalLoggerFactory() {
doRegister();
}
protected void doRegister() {
String loggerType = getLoggerType();
if (loggerFactoryCache.get(loggerType) != null) {
......
......@@ -22,6 +22,11 @@ import org.slf4j.LoggerFactory;
public class Slf4jLoggerFactory extends InternalLoggerFactory {
public Slf4jLoggerFactory() {
LoggerFactory.getILoggerFactory();
doRegister();
}
@Override
protected String getLoggerType() {
return InternalLoggerFactory.LOGGER_SLF4J;
......
......@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
......@@ -35,11 +37,10 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NamesrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
......
......@@ -30,12 +30,13 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NamesrvStartup {
......@@ -95,7 +96,7 @@ public class NamesrvStartup {
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
......
......@@ -24,13 +24,12 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KVConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
......
......@@ -20,6 +20,8 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
......@@ -28,11 +30,9 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final DefaultMQAdminExt adminExt;
private final String productEnvName;
......
......@@ -25,6 +25,8 @@ import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -50,11 +52,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController;
......@@ -65,13 +65,15 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
......
......@@ -18,13 +18,13 @@ package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
......
......@@ -33,6 +33,8 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
......@@ -42,11 +44,9 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
......@@ -395,9 +395,7 @@ public class RouteInfoManager {
log.error("pickupTopicRouteData Exception", e);
}
if (log.isDebugEnabled()) {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
......
......@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -44,7 +45,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
......@@ -61,7 +61,7 @@ public class DefaultRequestProcessorTest {
private RouteInfoManager routeInfoManager;
private Logger logger;
private InternalLogger logger;
@Before
public void init() throws Exception {
......@@ -78,8 +78,7 @@ public class DefaultRequestProcessorTest {
registerRouteInfoManager();
logger = mock(Logger.class);
when(logger.isInfoEnabled()).thenReturn(false);
logger = mock(InternalLogger.class);
setFinalStatic(DefaultRequestProcessor.class.getDeclaredField("log"), logger);
}
......
......@@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.slf4j.Logger;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
......@@ -51,7 +51,7 @@ class LocalMessageCache implements ServiceLifecycle {
private final ClientConfig clientConfig;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private final static Logger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
......
......@@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
......@@ -47,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer {
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static Logger log = ClientLogger.getLog();
final static InternalLogger log = ClientLogger.getLog();
public PullConsumerImpl(final String queueName, final KeyValue properties) {
this.properties = properties;
......
......@@ -34,15 +34,15 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.slf4j.Logger;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static Logger log = ClientLogger.getLog();
final static InternalLogger log = ClientLogger.getLog();
final KeyValue properties;
final DefaultMQProducer rocketmqProducer;
private boolean started = false;
......
......@@ -19,13 +19,14 @@ package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultPromise<V> implements Promise<V> {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(DefaultPromise.class);
private final Object lock = new Object();
private volatile FutureState state = FutureState.DOING;
private V result = null;
......
......@@ -25,10 +25,10 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.log.ClientLogger;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;
public final class BeanUtils {
final static Logger log = ClientLogger.getLog();
final static InternalLogger log = ClientLogger.getLog();
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册