diff --git a/README.md b/README.md
index 9b198627795eb278514374b0510bbb07865cad9f..fbbd0d718d6f99bc1d748df9eef96098fbcf0134 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ It offers a variety of features:
* Docs:
* Issues:
* Ask:
-* Slack:
+* Slack:
----------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d3caf8d84a8f317e619e6c9b14d07962385740e7..03b1151643ff7e750902e3ed1f7a79f6a9da1895 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -158,7 +158,7 @@ public class CommitLog {
/**
* When the normal exit, data recovery, all memory data have been flush
*/
- public void recoverNormally() {
+ public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
@@ -206,6 +206,12 @@ public class CommitLog {
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+ // Clear ConsumeQueue redundant data
+ if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+ log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
+ this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+ }
}
}
@@ -390,7 +396,7 @@ public class CommitLog {
this.confirmOffset = phyOffset;
}
- public void recoverAbnormally() {
+ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List mappedFiles = this.mappedFileQueue.getMappedFiles();
@@ -418,41 +424,41 @@ public class CommitLog {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
- // Normal data
- if (size > 0) {
- mappedFileOffset += size;
+ if (dispatchRequest.isSuccess()) {
+ // Normal data
+ if (size > 0) {
+ mappedFileOffset += size;
- if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
- if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+ if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+ if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+ this.defaultMessageStore.doDispatch(dispatchRequest);
+ }
+ } else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
- } else {
- this.defaultMessageStore.doDispatch(dispatchRequest);
}
- }
- // Intermediate file read error
- else if (size == -1) {
+ // Come the end of the file, switch to the next file
+ // Since the return 0 representatives met last hole, this can
+ // not be included in truncate offset
+ else if (size == 0) {
+ index++;
+ if (index >= mappedFiles.size()) {
+ // The current branch under normal circumstances should
+ // not happen
+ log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
+ break;
+ } else {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mappedFileOffset = 0;
+ log.info("recover next physics file, " + mappedFile.getFileName());
+ }
+ }
+ } else {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
- // Come the end of the file, switch to the next file
- // Since the return 0 representatives met last hole, this can
- // not be included in truncate offset
- else if (size == 0) {
- index++;
- if (index >= mappedFiles.size()) {
- // The current branch under normal circumstances should
- // not happen
- log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
- break;
- } else {
- mappedFile = mappedFiles.get(index);
- byteBuffer = mappedFile.sliceByteBuffer();
- processOffset = mappedFile.getFileFromOffset();
- mappedFileOffset = 0;
- log.info("recover next physics file, " + mappedFile.getFileName());
- }
- }
}
processOffset += mappedFileOffset;
@@ -461,7 +467,10 @@ public class CommitLog {
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
- this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+ if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+ log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
+ this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+ }
}
// Commitlog case files are deleted
else {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ff431ed889a8fadabce0c320b1ac1e7e94ea6907..e0aef4f37450dd546d49687d2af96ec212bbe3e3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1276,12 +1276,12 @@ public class DefaultMessageStore implements MessageStore {
}
private void recover(final boolean lastExitOK) {
- this.recoverConsumeQueue();
+ long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
- this.commitLog.recoverNormally();
+ this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
- this.commitLog.recoverAbnormally();
+ this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
@@ -1306,12 +1306,18 @@ public class DefaultMessageStore implements MessageStore {
}
}
- private void recoverConsumeQueue() {
+ private long recoverConsumeQueue() {
+ long maxPhysicOffset = -1;
for (ConcurrentMap maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.recover();
+ if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
+ maxPhysicOffset = logic.getMaxPhysicOffset();
+ }
}
}
+
+ return maxPhysicOffset;
}
private void recoverTopicQueueTable() {
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 20f94f09a0593c0f319b1af06763af22e34bf12c..57b6999c43795781ae20761e221ea897d32cb6a0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -18,9 +18,12 @@
package org.apache.rocketmq.store;
import java.io.File;
+import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
@@ -171,6 +175,120 @@ public class DefaultMessageStoreTest {
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
}
+ @Test
+ public void testRecover() throws Exception {
+ String topic = "recoverTopic";
+ MessageBody = StoreMessage.getBytes();
+ for (int i = 0; i < 100; i++) {
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+ }
+
+ Thread.sleep(100);//wait for build consumer queue
+ long maxPhyOffset = messageStore.getMaxPhyOffset();
+ long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+ //1.just reboot
+ messageStore.shutdown();
+ messageStore = buildMessageStore();
+ boolean load = messageStore.load();
+ assertTrue(load);
+ messageStore.start();
+ assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
+ assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+ //2.damage commitlog and reboot normal
+ for (int i = 0; i < 100; i++) {
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+ }
+ Thread.sleep(100);
+ long secondLastPhyOffset = messageStore.getMaxPhyOffset();
+ long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+
+ messageStore.shutdown();
+
+ //damage last message
+ damageCommitlog(secondLastPhyOffset);
+
+ //reboot
+ messageStore = buildMessageStore();
+ load = messageStore.load();
+ assertTrue(load);
+ messageStore.start();
+ assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
+ assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+ //3.damage commitlog and reboot abnormal
+ for (int i = 0; i < 100; i++) {
+ messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+ }
+ Thread.sleep(100);
+ secondLastPhyOffset = messageStore.getMaxPhyOffset();
+ secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+ messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+ messageStore.shutdown();
+
+ //damage last message
+ damageCommitlog(secondLastPhyOffset);
+ //add abort file
+ String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
+ File file = new File(fileName);
+ MappedFile.ensureDirOK(file.getParent());
+ file.createNewFile();
+
+ messageStore = buildMessageStore();
+ load = messageStore.load();
+ assertTrue(load);
+ messageStore.start();
+ assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
+ assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+ //message write again
+ for (int i = 0; i < 100; i++) {
+ messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ messageStore.putMessage(messageExtBrokerInner);
+ }
+ }
+
+ private void damageCommitlog(long offset) throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
+
+ FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
+ MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
+
+ int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
+ int topicLenIndex = (int) offset + 84 + bodyLen + 4;
+ mappedByteBuffer.position(topicLenIndex);
+ mappedByteBuffer.putInt(0);
+ mappedByteBuffer.putInt(0);
+ mappedByteBuffer.putInt(0);
+ mappedByteBuffer.putInt(0);
+
+ mappedByteBuffer.force();
+ fileChannel.force(true);
+ fileChannel.close();
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,