未验证 提交 edf97032 编写于 作者: D duheng 提交者: GitHub

Merge pull request #501 from suiyuzeng/issue_recover

[ISSUE #467] fix Message missed after recovering from abnormal shutdown
...@@ -31,7 +31,7 @@ It offers a variety of features: ...@@ -31,7 +31,7 @@ It offers a variety of features:
* Docs: <https://rocketmq.apache.org/docs/quick-start/> * Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues> * Issues: <https://github.com/apache/rocketmq/issues>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq> * Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-community.slack.com/> * Slack: <https://rocketmq-invite-automation.herokuapp.com/>
---------- ----------
......
...@@ -158,7 +158,7 @@ public class CommitLog { ...@@ -158,7 +158,7 @@ public class CommitLog {
/** /**
* When the normal exit, data recovery, all memory data have been flush * When the normal exit, data recovery, all memory data have been flush
*/ */
public void recoverNormally() { public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) { if (!mappedFiles.isEmpty()) {
...@@ -206,6 +206,12 @@ public class CommitLog { ...@@ -206,6 +206,12 @@ public class CommitLog {
this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} }
} }
...@@ -390,7 +396,7 @@ public class CommitLog { ...@@ -390,7 +396,7 @@ public class CommitLog {
this.confirmOffset = phyOffset; this.confirmOffset = phyOffset;
} }
public void recoverAbnormally() { public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp // recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
...@@ -418,41 +424,41 @@ public class CommitLog { ...@@ -418,41 +424,41 @@ public class CommitLog {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize(); int size = dispatchRequest.getMsgSize();
// Normal data if (dispatchRequest.isSuccess()) {
if (size > 0) { // Normal data
mappedFileOffset += size; if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest); this.defaultMessageStore.doDispatch(dispatchRequest);
} }
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
} }
} // Come the end of the file, switch to the next file
// Intermediate file read error // Since the return 0 representatives met last hole, this can
else if (size == -1) { // not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName()); log.info("recover physics file end, " + mappedFile.getFileName());
break; break;
} }
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} }
processOffset += mappedFileOffset; processOffset += mappedFileOffset;
...@@ -461,7 +467,10 @@ public class CommitLog { ...@@ -461,7 +467,10 @@ public class CommitLog {
this.mappedFileQueue.truncateDirtyFiles(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data // Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} }
// Commitlog case files are deleted // Commitlog case files are deleted
else { else {
......
...@@ -1276,12 +1276,12 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1276,12 +1276,12 @@ public class DefaultMessageStore implements MessageStore {
} }
private void recover(final boolean lastExitOK) { private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue(); long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) { if (lastExitOK) {
this.commitLog.recoverNormally(); this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else { } else {
this.commitLog.recoverAbnormally(); this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
} }
this.recoverTopicQueueTable(); this.recoverTopicQueueTable();
...@@ -1306,12 +1306,18 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1306,12 +1306,18 @@ public class DefaultMessageStore implements MessageStore {
} }
} }
private void recoverConsumeQueue() { private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
logic.recover(); logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
} }
} }
return maxPhysicOffset;
} }
private void recoverTopicQueueTable() { private void recoverTopicQueueTable() {
......
...@@ -18,9 +18,12 @@ ...@@ -18,9 +18,12 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File; import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException; import java.nio.channels.OverlappingFileLockException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
...@@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig; ...@@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After; import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before; import org.junit.Before;
...@@ -171,6 +175,120 @@ public class DefaultMessageStoreTest { ...@@ -171,6 +175,120 @@ public class DefaultMessageStoreTest {
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
} }
@Test
public void testRecover() throws Exception {
String topic = "recoverTopic";
MessageBody = StoreMessage.getBytes();
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);//wait for build consumer queue
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
//1.just reboot
messageStore.shutdown();
messageStore = buildMessageStore();
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//2.damage commitlog and reboot normal
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//reboot
messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//3.damage commitlog and reboot abnormal
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
//damage last message
damageCommitlog(secondLastPhyOffset);
//add abort file
String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
//message write again
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private class MyMessageArrivingListener implements MessageArrivingListener { private class MyMessageArrivingListener implements MessageArrivingListener {
@Override @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册