提交 8f788c3d 编写于 作者: D dongeforever

Add reputoffset test for dledger commitlog test

上级 58496c2f
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.store.dledger; package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.openmessaging.storage.dledger.entry.DLedgerEntry;
...@@ -28,7 +29,6 @@ import io.openmessaging.storage.dledger.store.file.MmapFileList; ...@@ -28,7 +29,6 @@ import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -357,54 +357,49 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -357,54 +357,49 @@ public class DLedgerCommitLog extends CommitLog {
} }
// Back to Results // Back to Results
AppendMessageResult appendResult = null; PutMessageStatus putMessageStatus = null;
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; AppendMessageResult appendResult;
CompletableFuture<AppendEntryResponse> dlegerFuture = null; AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult = null; EncodeResult encodeResult;
long eclipseTimeInLock = 0L;
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long queueOffset = -1; long eclipseTimeInLock;
long queueOffset;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
//TO DO use buffer
encodeResult = this.messageSerializer.serialize(msg); encodeResult = this.messageSerializer.serialize(msg);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
if (encodeResult.status != AppendMessageStatus.PUT_OK) { if (encodeResult.status != AppendMessageStatus.PUT_OK) {
appendResult = new AppendMessageResult(encodeResult.status); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
switch (encodeResult.status) { }
case PROPERTIES_SIZE_EXCEEDED: AppendEntryRequest request = new AppendEntryRequest();
case MESSAGE_SIZE_EXCEEDED: request.setGroup(dLedgerConfig.getGroup());
putMessageStatus = PutMessageStatus.MESSAGE_ILLEGAL; request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
break; request.setBody(encodeResult.data);
} dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
} else { if (dledgerFuture.getPos() == -1) {
AppendEntryRequest request = new AppendEntryRequest(); return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dlegerFuture = dLedgerServer.handleAppend(request);
if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
//TO DO make sure the local store is ok
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} else {
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
}
} }
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
} catch (Exception e) { } catch (Exception e) {
log.error("Put message error", e); log.error("Put message error", e);
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
} finally { } finally {
beginTimeInDledgerLock = 0; beginTimeInDledgerLock = 0;
putMessageLock.unlock(); putMessageLock.unlock();
...@@ -414,34 +409,27 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -414,34 +409,27 @@ public class DLedgerCommitLog extends CommitLog {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult); log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult);
} }
if (dlegerFuture != null) { try {
try { AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS); switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { case SUCCESS:
case SUCCESS: putMessageStatus = PutMessageStatus.PUT_OK;
putMessageStatus = PutMessageStatus.PUT_OK; break;
long wroteOffset = appendEntryResponse.getPos() + DLedgerEntry.BODY_OFFSET; case INCONSISTENT_LEADER:
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); case NOT_LEADER:
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); case LEADER_NOT_READY:
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0); case DISK_FULL:
break; putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
case INCONSISTENT_LEADER: break;
case NOT_LEADER: case WAIT_QUORUM_ACK_TIMEOUT:
case LEADER_NOT_READY: putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
case DISK_FULL: break;
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; case LEADER_PENDING_FULL:
break; putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
case WAIT_QUORUM_ACK_TIMEOUT: break;
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Exception ignored) {
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} }
} catch (Throwable ignored) {
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
} }
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
......
...@@ -12,19 +12,21 @@ import io.openmessaging.storage.dledger.DLedgerServer; ...@@ -12,19 +12,21 @@ import io.openmessaging.storage.dledger.DLedgerServer;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase; import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class DLedgerCommitlogTest extends StoreTestBase { public class DLedgerCommitlogTest extends StoreTestBase {
private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId) throws Exception { private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception {
baseDirs.add(base); baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100); storeConfig.setMapedFileSizeCommitLog(1024 * 100);
...@@ -39,7 +41,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { ...@@ -39,7 +41,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
storeConfig.setdLegerGroup(group); storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers); storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId); storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig()); }, new BrokerConfig());
if (leaderId != null) { if (leaderId != null) {
...@@ -52,17 +54,67 @@ public class DLedgerCommitlogTest extends StoreTestBase { ...@@ -52,17 +54,67 @@ public class DLedgerCommitlogTest extends StoreTestBase {
} }
} }
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
}
defaultMessageStore.load(); defaultMessageStore.load();
defaultMessageStore.start(); defaultMessageStore.start();
return defaultMessageStore; return defaultMessageStore;
} }
@Test
public void testReputOffset() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
for (int i = 0; i < 1000; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, true);
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
}
@Test @Test
public void testPutAndGetMessage() throws Exception { public void testPutAndGetMessage() throws Exception {
String base = createBaseDir(); String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort()); String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString(); String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null); DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000); Thread.sleep(1000);
String topic = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString();
...@@ -102,7 +154,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { ...@@ -102,7 +154,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
public void testCommittedPos() throws Exception { public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString(); String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0"); DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0", false);
String topic = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage(); MessageExtBrokerInner msgInner = buildMessage();
...@@ -117,7 +169,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { ...@@ -117,7 +169,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0"); DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0", false);
Thread.sleep(2000); Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册