提交 aa1c7577 编写于 作者: L lindzh 提交者: lollipop

[ROCKETMQ-254]Fix logger appender unit tests which cost too long

Author: lindzh <linsony0@163.com>

Closes #141 from lindzh/logger_appender_test.
上级 bcc65e54
...@@ -18,11 +18,15 @@ ...@@ -18,11 +18,15 @@
package org.apache.rocketmq.broker; package org.apache.rocketmq.broker;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class BrokerControllerTest { public class BrokerControllerTest {
...@@ -44,4 +48,9 @@ public class BrokerControllerTest { ...@@ -44,4 +48,9 @@ public class BrokerControllerTest {
brokerController.shutdown(); brokerController.shutdown();
} }
} }
@After
public void destory(){
UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir()));
}
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.filter; package org.apache.rocketmq.broker.filter;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
...@@ -232,7 +233,7 @@ public class ConsumerFilterManagerTest { ...@@ -232,7 +233,7 @@ public class ConsumerFilterManagerTest {
assertThat(filterData.isDead()).isTrue(); assertThat(filterData.isDead()).isTrue();
assertThat(filterData.getCompiledExpression()).isNotNull(); assertThat(filterData.getCompiledExpression()).isNotNull();
} finally { } finally {
deleteDirectory("./unit_test"); UtilAll.deleteFile(new File("./unit_test"));
} }
} }
...@@ -269,23 +270,8 @@ public class ConsumerFilterManagerTest { ...@@ -269,23 +270,8 @@ public class ConsumerFilterManagerTest {
assertThat(topicData).isNullOrEmpty(); assertThat(topicData).isNullOrEmpty();
} finally { } finally {
deleteDirectory("./unit_test"); UtilAll.deleteFile(new File("./unit_test"));
} }
} }
protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}
protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}
file.delete();
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.filter; package org.apache.rocketmq.broker.filter;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -174,22 +175,6 @@ public class MessageStoreWithFilterTest { ...@@ -174,22 +175,6 @@ public class MessageStoreWithFilterTest {
return msgs; return msgs;
} }
protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}
protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}
file.delete();
}
protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) { protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) {
List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>(); List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>();
...@@ -301,7 +286,7 @@ public class MessageStoreWithFilterTest { ...@@ -301,7 +286,7 @@ public class MessageStoreWithFilterTest {
} finally { } finally {
master.shutdown(); master.shutdown();
master.destroy(); master.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -386,7 +371,7 @@ public class MessageStoreWithFilterTest { ...@@ -386,7 +371,7 @@ public class MessageStoreWithFilterTest {
} finally { } finally {
master.shutdown(); master.shutdown();
master.destroy(); master.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
} }
...@@ -505,4 +505,19 @@ public class UtilAll { ...@@ -505,4 +505,19 @@ public class UtilAll {
throw new RuntimeException("Can not get local ip", e); throw new RuntimeException("Can not get local ip", e);
} }
} }
public static void deleteFile(File file) {
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
} else if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteFile(file1);
}
file.delete();
}
}
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.logappender.common; package org.apache.rocketmq.logappender.common;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.MQProducer;
...@@ -39,20 +40,26 @@ public class ProducerInstance { ...@@ -39,20 +40,26 @@ public class ProducerInstance {
public static final String DEFAULT_GROUP = "rocketmq_appender"; public static final String DEFAULT_GROUP = "rocketmq_appender";
private static ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>(); private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
private static String genKey(String nameServerAddress, String group) { private static ProducerInstance instance = new ProducerInstance();
public static ProducerInstance getProducerInstance() {
return instance;
}
private String genKey(String nameServerAddress, String group) {
return nameServerAddress + "_" + group; return nameServerAddress + "_" + group;
} }
public static MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
if (group == null) { if (StringUtils.isBlank(group)) {
group = DEFAULT_GROUP; group = DEFAULT_GROUP;
} }
String genKey = genKey(nameServerAddress, group); String genKey = genKey(nameServerAddress, group);
MQProducer p = producerMap.get(genKey); MQProducer p = getProducerInstance().producerMap.get(genKey);
if (p != null) { if (p != null) {
return p; return p;
} }
...@@ -60,8 +67,7 @@ public class ProducerInstance { ...@@ -60,8 +67,7 @@ public class ProducerInstance {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress); defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null; MQProducer beforeProducer = null;
//cas put producer beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
beforeProducer = producerMap.putIfAbsent(genKey, defaultMQProducer);
if (beforeProducer != null) { if (beforeProducer != null) {
return beforeProducer; return beforeProducer;
} }
...@@ -70,22 +76,22 @@ public class ProducerInstance { ...@@ -70,22 +76,22 @@ public class ProducerInstance {
} }
public static void removeAndClose(String nameServerAddress, String group) { public void removeAndClose(String nameServerAddress, String group) {
if (group == null) { if (group == null) {
group = DEFAULT_GROUP; group = DEFAULT_GROUP;
} }
String genKey = genKey(nameServerAddress, group); String genKey = genKey(nameServerAddress, group);
MQProducer producer = producerMap.remove(genKey); MQProducer producer = getProducerInstance().producerMap.remove(genKey);
if (producer != null) { if (producer != null) {
producer.shutdown(); producer.shutdown();
} }
} }
public static void closeAll() { public void closeAll() {
Set<Map.Entry<String, MQProducer>> entries = producerMap.entrySet(); Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
for (Map.Entry<String, MQProducer> entry : entries) { for (Map.Entry<String, MQProducer> entry : entries) {
producerMap.remove(entry.getKey()); getProducerInstance().producerMap.remove(entry.getKey());
entry.getValue().shutdown(); entry.getValue().shutdown();
} }
} }
......
...@@ -66,7 +66,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { ...@@ -66,7 +66,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
return; return;
} }
try { try {
producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup);
} catch (Exception e) { } catch (Exception e) {
LogLog.error("activateOptions nameserver:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); LogLog.error("activateOptions nameserver:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
} }
...@@ -129,7 +129,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { ...@@ -129,7 +129,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
this.closed = true; this.closed = true;
try { try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) { } catch (Exception e) {
LogLog.error("Closing RocketmqLog4jAppender [" + name + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); LogLog.error("Closing RocketmqLog4jAppender [" + name + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
} }
......
...@@ -81,7 +81,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { ...@@ -81,7 +81,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
this.nameServerAddress = nameServerAddress; this.nameServerAddress = nameServerAddress;
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
try { try {
this.producer = ProducerInstance.getInstance(this.nameServerAddress, this.producerGroup); this.producer = ProducerInstance.getProducerInstance().getInstance(this.nameServerAddress, this.producerGroup);
} catch (Exception e) { } catch (Exception e) {
ErrorHandler handler = this.getHandler(); ErrorHandler handler = this.getHandler();
if (handler != null) { if (handler != null) {
...@@ -127,7 +127,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { ...@@ -127,7 +127,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
public boolean stop(long timeout, TimeUnit timeUnit) { public boolean stop(long timeout, TimeUnit timeUnit) {
this.setStopping(); this.setStopping();
try { try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) { } catch (Exception e) {
ErrorHandler handler = this.getHandler(); ErrorHandler handler = this.getHandler();
if (handler != null) { if (handler != null) {
......
...@@ -97,7 +97,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { ...@@ -97,7 +97,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
return; return;
} }
try { try {
producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup);
} catch (Exception e) { } catch (Exception e) {
addError("Starting RocketmqLogbackAppender [" + this.getName() addError("Starting RocketmqLogbackAppender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
...@@ -119,7 +119,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { ...@@ -119,7 +119,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
this.started = false; this.started = false;
try { try {
ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) { } catch (Exception e) {
addError("Closeing RocketmqLogbackAppender [" + this.getName() addError("Closeing RocketmqLogbackAppender [" + this.getName()
+ "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
......
...@@ -16,139 +16,58 @@ ...@@ -16,139 +16,58 @@
*/ */
package org.apache.rocketmq.logappender; package org.apache.rocketmq.logappender;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.logappender.common.ProducerInstance; import org.apache.rocketmq.logappender.common.ProducerInstance;
import org.apache.rocketmq.namesrv.NamesrvController; import org.junit.Before;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.mockito.invocation.InvocationOnMock;
import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.mockito.stubbing.Answer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig; import static org.mockito.Mockito.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.util.List; import java.lang.reflect.Field;
import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Basic test rocketmq broker and name server init * Basic test rocketmq broker and name server init
*/ */
public class AbstractTestCase { public class AbstractTestCase {
private static String nameServer = "localhost:9876"; private static CopyOnWriteArrayList<Message> messages = new CopyOnWriteArrayList<>();
private static NamesrvController namesrvController;
private static BrokerController brokerController;
private static String topic = "TopicTest";
@BeforeClass
public static void startRocketmqService() throws Exception {
startNamesrv();
startBroker();
}
/**
* Start rocketmq name server
* @throws Exception
*/
private static void startNamesrv() throws Exception {
NamesrvConfig namesrvConfig = new NamesrvConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
boolean initResult = namesrvController.initialize();
if (!initResult) {
namesrvController.shutdown();
throw new Exception();
}
namesrvController.start();
}
/** @Before
* Start rocketmq broker service public void mockLoggerAppender() throws Exception {
* @throws Exception DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender"));
*/ doAnswer(new Answer<Void>() {
private static void startBroker() throws Exception { @Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); Message message = (Message) invocationOnMock.getArgument(0);
messages.add(message);
BrokerConfig brokerConfig = new BrokerConfig(); return null;
brokerConfig.setNamesrvAddr(nameServer); }
brokerConfig.setBrokerId(MixAll.MASTER_ID); }).when(defaultMQProducer).sendOneway(any(Message.class));
NettyServerConfig nettyServerConfig = new NettyServerConfig(); ProducerInstance spy = mock(ProducerInstance.class);
nettyServerConfig.setListenPort(10911); Field instance = ProducerInstance.class.getDeclaredField("instance");
NettyClientConfig nettyClientConfig = new NettyClientConfig(); instance.setAccessible(true);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); instance.set(ProducerInstance.class, spy);
doReturn(defaultMQProducer).when(spy).getInstance(anyString(), anyString());
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
boolean initResult = brokerController.initialize();
if (!initResult) {
brokerController.shutdown();
throw new Exception();
}
brokerController.start();
} }
@AfterClass public void clear() {
public static void stop() {
ProducerInstance.closeAll();
if (brokerController != null) {
brokerController.shutdown();
}
if (namesrvController != null) {
namesrvController.shutdown();
}
} }
protected int consumeMessages(int count,final String key,int timeout) throws MQClientException, InterruptedException { protected int consumeMessages(int count, final String key, int timeout) {
final AtomicInteger cc = new AtomicInteger(0); final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(count); for (Message message : messages) {
String body = new String(message.getBody());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); if (body.contains(key)) {
consumer.setNamesrvAddr(nameServer); cc.incrementAndGet();
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
if(key==null||body.contains(key)){
countDownLatch.countDown();
cc.incrementAndGet();
continue;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
}); }
consumer.start();
countDownLatch.await(timeout, TimeUnit.SECONDS);
consumer.shutdown();
return cc.get(); return cc.get();
} }
} }
...@@ -32,12 +32,13 @@ public abstract class Log4jTest extends AbstractTestCase{ ...@@ -32,12 +32,13 @@ public abstract class Log4jTest extends AbstractTestCase{
@Test @Test
public void testLog4j() throws InterruptedException, MQClientException { public void testLog4j() throws InterruptedException, MQClientException {
clear();
Logger logger = Logger.getLogger("testLogger"); Logger logger = Logger.getLogger("testLogger");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 10; i++) {
logger.info("log4j " + this.getType() + " simple test message " + i); logger.info("log4j " + this.getType() + " simple test message " + i);
} }
int received = consumeMessages(30, "log4j",30); int received = consumeMessages(10, "log4j",10);
Assert.assertTrue(received>20); Assert.assertTrue(received>5);
} }
} }
...@@ -44,11 +44,12 @@ public class LogbackTest extends AbstractTestCase{ ...@@ -44,11 +44,12 @@ public class LogbackTest extends AbstractTestCase{
@Test @Test
public void testLogback() throws InterruptedException, MQClientException { public void testLogback() throws InterruptedException, MQClientException {
clear();
Logger logger = LoggerFactory.getLogger("testLogger"); Logger logger = LoggerFactory.getLogger("testLogger");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 10; i++) {
logger.info("logback test message " + i); logger.info("logback test message " + i);
} }
int received = consumeMessages(30, "logback",30); int received = consumeMessages(10, "logback",10);
Assert.assertTrue(received>20); Assert.assertTrue(received>=5);
} }
} }
...@@ -34,11 +34,12 @@ public class log4j2Test extends AbstractTestCase{ ...@@ -34,11 +34,12 @@ public class log4j2Test extends AbstractTestCase{
@Test @Test
public void testLog4j2() throws InterruptedException, MQClientException { public void testLog4j2() throws InterruptedException, MQClientException {
clear();
Logger logger = LogManager.getLogger("test"); Logger logger = LogManager.getLogger("test");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 10; i++) {
logger.info("log4j2 log message " + i); logger.info("log4j2 log message " + i);
} }
int received = consumeMessages(30, "log4j2",30); int received = consumeMessages(10, "log4j2",10);
Assert.assertTrue(received>20); Assert.assertTrue(received>5);
} }
} }
...@@ -32,7 +32,7 @@ log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] ...@@ -32,7 +32,7 @@ log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t]
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=log log4j.appender.mq.Tag=log
log4j.appender.mq.Topic=TopicTest log4j.appender.mq.Topic=TopicTest
log4j.appender.mq.ProducerGroup=log4jp log4j.appender.mq.ProducerGroup=loggerAppender
log4j.appender.mq.NameServerAddress=127.0.0.1:9876 log4j.appender.mq.NameServerAddress=127.0.0.1:9876
log4j.appender.mq.layout=org.apache.log4j.PatternLayout log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
\ No newline at end of file
...@@ -29,22 +29,16 @@ limitations under the License. ...@@ -29,22 +29,16 @@ limitations under the License.
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"> <appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
<param name="Tag" value="log1" /> <param name="Tag" value="log1" />
<param name="Topic" value="TopicTest" /> <param name="Topic" value="TopicTest" />
<param name="ProducerGroup" value="log4jxml" /> <param name="ProducerGroup" value="loggerAppender" />
<param name="NameServerAddress" value="127.0.0.1:9876"/> <param name="NameServerAddress" value="127.0.0.1:9876"/>
<layout class="org.apache.log4j.PatternLayout"> <layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
</layout> </layout>
</appender> </appender>
<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="1024" />
<param name="Blocking" value="false" />
<appender-ref ref="mqAppender1"/>
</appender>
<logger name="testLogger" additivity="false"> <logger name="testLogger" additivity="false">
<level value="INFO" /> <level value="INFO" />
<appender-ref ref="mqAsyncAppender1" /> <appender-ref ref="mqAppender1" />
<appender-ref ref="consoleAppender" /> <appender-ref ref="consoleAppender" />
</logger> </logger>
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
<Configuration status="warn" name="Rocketmq"> <Configuration status="warn" name="Rocketmq">
<Appenders> <Appenders>
<RocketMQ name="rocketmqAppender" producerGroup="log4j2" nameServerAddress="127.0.0.1:9876" <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876"
topic="TopicTest" tag="log"> topic="TopicTest" tag="log">
<PatternLayout pattern="%d [%p] hahahah %c %m%n"/> <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ> </RocketMQ>
......
...@@ -58,21 +58,13 @@ ...@@ -58,21 +58,13 @@
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"> <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
<tag>log1</tag> <tag>log1</tag>
<topic>TopicTest</topic> <topic>TopicTest</topic>
<producerGroup>logback</producerGroup> <producerGroup>loggerAppender</producerGroup>
<nameServerAddress>127.0.0.1:9876</nameServerAddress> <nameServerAddress>127.0.0.1:9876</nameServerAddress>
<layout> <layout>
<pattern>%date %p %t - %m%n</pattern> <pattern>%date %p %t - %m%n</pattern>
</layout> </layout>
</appender> </appender>
<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<discardingThreshold>80</discardingThreshold>
<maxFlushTime>2000</maxFlushTime>
<neverBlock>true</neverBlock>
<appender-ref ref="mqAppender1"/>
</appender>
<root> <root>
<level value="debug"/> <level value="debug"/>
<appender-ref ref="consoleAppender"/> <appender-ref ref="consoleAppender"/>
...@@ -83,7 +75,7 @@ ...@@ -83,7 +75,7 @@
</logger> </logger>
<logger name="testLogger" level="debug" additivity="false"> <logger name="testLogger" level="debug" additivity="false">
<appender-ref ref="mqAsyncAppender1"/> <appender-ref ref="mqAppender1"/>
<appender-ref ref="consoleAppender"/> <appender-ref ref="consoleAppender"/>
</logger> </logger>
</configuration> </configuration>
...@@ -25,11 +25,14 @@ import java.util.ArrayList; ...@@ -25,11 +25,14 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -58,6 +61,11 @@ public class AppendCallbackTest { ...@@ -58,6 +61,11 @@ public class AppendCallbackTest {
callback = commitLog.new DefaultAppendMessageCallback(1024); callback = commitLog.new DefaultAppendMessageCallback(1024);
} }
@After
public void destroy(){
UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore"));
}
@Test @Test
public void testAppendMessageBatchEndOfFile() throws Exception{ public void testAppendMessageBatchEndOfFile() throws Exception{
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
...@@ -62,22 +64,6 @@ public class ConsumeQueueExtTest { ...@@ -62,22 +64,6 @@ public class ConsumeQueueExtTest {
return cqExtUnit; return cqExtUnit;
} }
protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}
protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}
file.delete();
}
protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut, protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut,
boolean unitSameSize, int unitCount) { boolean unitSameSize, int unitCount) {
for (int i = 0; i < unitCount; i++) { for (int i = 0; i < unitCount; i++) {
...@@ -111,7 +97,7 @@ public class ConsumeQueueExtTest { ...@@ -111,7 +97,7 @@ public class ConsumeQueueExtTest {
putSth(consumeQueueExt, true, false, unitCount); putSth(consumeQueueExt, true, false, unitCount);
} finally { } finally {
consumeQueueExt.destroy(); consumeQueueExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -139,7 +125,7 @@ public class ConsumeQueueExtTest { ...@@ -139,7 +125,7 @@ public class ConsumeQueueExtTest {
} }
} finally { } finally {
consumeQueueExt.destroy(); consumeQueueExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -161,7 +147,7 @@ public class ConsumeQueueExtTest { ...@@ -161,7 +147,7 @@ public class ConsumeQueueExtTest {
assertThat(unit).isNull(); assertThat(unit).isNull();
} finally { } finally {
consumeQueueExt.destroy(); consumeQueueExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -199,7 +185,7 @@ public class ConsumeQueueExtTest { ...@@ -199,7 +185,7 @@ public class ConsumeQueueExtTest {
} finally { } finally {
putCqExt.destroy(); putCqExt.destroy();
loadCqExt.destroy(); loadCqExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -222,7 +208,7 @@ public class ConsumeQueueExtTest { ...@@ -222,7 +208,7 @@ public class ConsumeQueueExtTest {
assertThat(expectMinAddress).isEqualTo(minAddress); assertThat(expectMinAddress).isEqualTo(minAddress);
} finally { } finally {
consumeQueueExt.destroy(); consumeQueueExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
...@@ -245,7 +231,12 @@ public class ConsumeQueueExtTest { ...@@ -245,7 +231,12 @@ public class ConsumeQueueExtTest {
assertThat(expectMaxAddress).isEqualTo(maxAddress); assertThat(expectMaxAddress).isEqualTo(maxAddress);
} finally { } finally {
consumeQueueExt.destroy(); consumeQueueExt.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
@After
public void destroy(){
UtilAll.deleteFile(new File(storePath));
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
...@@ -131,22 +132,6 @@ public class ConsumeQueueTest { ...@@ -131,22 +132,6 @@ public class ConsumeQueueTest {
} }
} }
protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}
protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}
file.delete();
}
@Test @Test
public void testConsumeQueueWithExtendData() { public void testConsumeQueueWithExtendData() {
DefaultMessageStore master = null; DefaultMessageStore master = null;
...@@ -220,7 +205,7 @@ public class ConsumeQueueTest { ...@@ -220,7 +205,7 @@ public class ConsumeQueueTest {
} finally { } finally {
master.shutdown(); master.shutdown();
master.destroy(); master.destroy();
deleteDirectory(storePath); UtilAll.deleteFile(new File(storePath));
} }
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
...@@ -24,8 +25,10 @@ import java.util.Map; ...@@ -24,8 +25,10 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -47,6 +50,13 @@ public class DefaultMessageStoreTest { ...@@ -47,6 +50,13 @@ public class DefaultMessageStoreTest {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
} }
@After
public void destory() {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathRootDir());
UtilAll.deleteFile(file);
}
public MessageStore buildMessageStore() throws Exception { public MessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
......
...@@ -17,8 +17,13 @@ ...@@ -17,8 +17,13 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -176,4 +181,10 @@ public class MappedFileQueueTest { ...@@ -176,4 +181,10 @@ public class MappedFileQueueTest {
mappedFileQueue.shutdown(1000); mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy(); mappedFileQueue.destroy();
} }
@After
public void destory() {
File file = new File("target/unit_test_store");
UtilAll.deleteFile(file);
}
} }
...@@ -20,7 +20,11 @@ ...@@ -20,7 +20,11 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -47,4 +51,10 @@ public class MappedFileTest { ...@@ -47,4 +51,10 @@ public class MappedFileTest {
assertThat(mappedFile.isCleanupOver()).isTrue(); assertThat(mappedFile.isCleanupOver()).isTrue();
assertThat(mappedFile.destroy(1000)).isTrue(); assertThat(mappedFile.destroy(1000)).isTrue();
} }
@After
public void destory() {
File file = new File("target/unit_test_store");
UtilAll.deleteFile(file);
}
} }
...@@ -20,7 +20,11 @@ ...@@ -20,7 +20,11 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -42,4 +46,10 @@ public class StoreCheckpointTest { ...@@ -42,4 +46,10 @@ public class StoreCheckpointTest {
assertThat(storeCheckpoint.getPhysicMsgTimestamp()).isEqualTo(physicMsgTimestamp); assertThat(storeCheckpoint.getPhysicMsgTimestamp()).isEqualTo(physicMsgTimestamp);
assertThat(storeCheckpoint.getLogicsMsgTimestamp()).isEqualTo(logicsMsgTimestamp); assertThat(storeCheckpoint.getLogicsMsgTimestamp()).isEqualTo(logicsMsgTimestamp);
} }
@After
public void destory() {
File file = new File("target/checkpoint_test");
UtilAll.deleteFile(file);
}
} }
...@@ -20,8 +20,11 @@ ...@@ -20,8 +20,11 @@
*/ */
package org.apache.rocketmq.store.index; package org.apache.rocketmq.store.index;
import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.UtilAll;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -42,6 +45,8 @@ public class IndexFileTest { ...@@ -42,6 +45,8 @@ public class IndexFileTest {
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertThat(putResult).isFalse(); assertThat(putResult).isFalse();
indexFile.destroy(0); indexFile.destroy(0);
File file = new File("100");
UtilAll.deleteFile(file);
} }
@Test @Test
...@@ -62,5 +67,7 @@ public class IndexFileTest { ...@@ -62,5 +67,7 @@ public class IndexFileTest {
assertThat(phyOffsets).isNotEmpty(); assertThat(phyOffsets).isNotEmpty();
assertThat(phyOffsets.size()).isEqualTo(1); assertThat(phyOffsets.size()).isEqualTo(1);
indexFile.destroy(0); indexFile.destroy(0);
File file = new File("200");
UtilAll.deleteFile(file);
} }
} }
...@@ -25,6 +25,7 @@ import java.util.UUID; ...@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig;
...@@ -75,7 +76,7 @@ public class IntegrationTestBase { ...@@ -75,7 +76,7 @@ public class IntegrationTestBase {
} }
} }
for (File file : TMPE_FILES) { for (File file : TMPE_FILES) {
deleteFile(file); UtilAll.deleteFile(file);
} }
} catch (Exception e){ } catch (Exception e){
logger.error("Shutdown error", e); logger.error("Shutdown error", e);
...@@ -187,5 +188,5 @@ public class IntegrationTestBase { ...@@ -187,5 +188,5 @@ public class IntegrationTestBase {
file.delete(); file.delete();
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册