提交 1bedba8c 编写于 作者: D duhengforever

Merge branch 'develop'

......@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null;
......@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
public ConsumeQueue getConsumeQueue(String topic, int queueId) {
return next.getConsumeQueue(topic, queueId);
}
@Override
public BrokerStatsManager getBrokerStatsManager() {
return next.getBrokerStatsManager();
};
}
......@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor {
......@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Delay offset not supported in this messagetore");
return response;
}
String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.error("get all delay offset from master error.", e);
log.error("Get all delay offset from master error.", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
......@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
MessageStore messageStore = this.brokerController.getMessageStore();
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) {
......
......@@ -55,11 +55,15 @@ public interface MQPushConsumer extends MQConsumer {
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
*
* Subscribe some topic
*
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
*/
@Deprecated
void subscribe(final String topic, final String fullClassName,
final String filterClassSource) throws MQClientException;
......
......@@ -720,7 +720,11 @@ public class MQClientInstance {
return false;
}
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
*/
@Deprecated
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
final String topic,
final String filterClassSource) throws UnsupportedEncodingException {
......
......@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Add reset feature for @see java.util.concurrent.CountDownLatch2
* Add reset feature for @see java.util.concurrent.CountDownLatch
*/
public class CountDownLatch2 {
private final Sync sync;
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.constant;
public class LoggerName {
public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
public static final String NAMESRV_CONSOLE_NAME = "RocketmqNamesrvConsole";
public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
public static final String BROKER_CONSOLE_NAME = "RocketmqConsole";
public static final String CLIENT_LOGGER_NAME = "RocketmqClient";
......
......@@ -34,7 +34,7 @@ public class MixAllTest {
List<String> localInetAddress = MixAll.getLocalInetAddress();
String local = InetAddress.getLocalHost().getHostAddress();
assertThat(localInetAddress).contains("127.0.0.1");
assertThat(localInetAddress).contains(local);
assertThat(local).isNotNull();
}
@Test
......
......@@ -82,6 +82,11 @@
<appender-ref ref="RocketmqNamesrvAppender"/>
</logger>
<logger name="RocketmqNamesrvConsole" additivity="false">
<level value="INFO"/>
<appender-ref ref="STDOUT"/>
</logger>
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.example.simple;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
......@@ -32,7 +34,9 @@ public class AsyncProducer {
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10000000; i++) {
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
......@@ -42,11 +46,13 @@ public class AsyncProducer {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
......@@ -55,6 +61,7 @@ public class AsyncProducer {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
......@@ -99,8 +99,9 @@ public class NamesrvStartup {
}
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
......
......@@ -321,6 +321,9 @@
<skipAfterFailureCount>1</skipAfterFailureCount>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<excludes>
<exclude>**/IT*.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
......@@ -333,23 +336,6 @@
<artifactId>sonar-maven-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<skipAfterFailureCount>1</skipAfterFailureCount>
<excludes>
<exclude>**/NormalMsgDelayIT.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
......
......@@ -158,7 +158,7 @@ public class CommitLog {
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally() {
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
......@@ -206,6 +206,12 @@ public class CommitLog {
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
}
......@@ -390,7 +396,7 @@ public class CommitLog {
this.confirmOffset = phyOffset;
}
public void recoverAbnormally() {
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
......@@ -418,41 +424,41 @@ public class CommitLog {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
processOffset += mappedFileOffset;
......@@ -461,7 +467,10 @@ public class CommitLog {
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
......
......@@ -1276,12 +1276,12 @@ public class DefaultMessageStore implements MessageStore {
}
private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue();
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
this.commitLog.recoverNormally();
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally();
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
......@@ -1306,12 +1306,18 @@ public class DefaultMessageStore implements MessageStore {
}
}
private void recoverConsumeQueue() {
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
private void recoverTopicQueueTable() {
......@@ -1371,6 +1377,7 @@ public class DefaultMessageStore implements MessageStore {
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
@Override
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
......
......@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
......@@ -358,4 +359,11 @@ public interface MessageStore {
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);
/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
}
......@@ -18,9 +18,12 @@
package org.apache.rocketmq.store;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
......@@ -171,6 +175,120 @@ public class DefaultMessageStoreTest {
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
}
@Test
public void testRecover() throws Exception {
String topic = "recoverTopic";
MessageBody = StoreMessage.getBytes();
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);//wait for build consumer queue
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
//1.just reboot
messageStore.shutdown();
messageStore = buildMessageStore();
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//2.damage commitlog and reboot normal
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//reboot
messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//3.damage commitlog and reboot abnormal
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//add abort file
String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//message write again
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
......
......@@ -842,7 +842,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
if (RemotingUtil.socketAddress2String(msg.getStoreHost()).equals(addr)) {
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册