提交 1a2fe9f8 编写于 作者: D dongeforever

Test the committed pos for dleger

上级 456691bf
......@@ -4,6 +4,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.dleger.DLegerLeaderElector;
import org.apache.rocketmq.dleger.MemberState;
import org.apache.rocketmq.dleger.utils.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
......@@ -20,6 +21,7 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
}
@Override public void handle(long term, MemberState.Role role) {
long start = System.currentTimeMillis();
try {
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
......@@ -41,9 +43,9 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
default:
break;
}
log.info("Finish handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
log.info("Finish handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start));
} catch (Throwable t) {
log.info("Failed handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t);
log.info("Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t);
}
}
}
......@@ -133,12 +133,6 @@ public class DLegerCommitLog extends CommitLog {
return 0;
}
/**
* Read CommitLog data, use data replication
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
......@@ -164,14 +158,37 @@ public class DLegerCommitLog extends CommitLog {
}
public SelectMmapBufferResult truncate(SelectMmapBufferResult sbr) {
long committedPos = dLegerFileStore.getCommittedPos();
if (sbr == null || sbr.getStartOffset() == committedPos) {
return null;
}
if (sbr.getStartOffset() + sbr.getSize() <= committedPos) {
return sbr;
} else {
sbr.setSize((int) (committedPos - sbr.getStartOffset()));
return sbr;
}
}
/**
* Read CommitLog data, use data replication
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
if (offset >= dLegerFileStore.getCommittedPos()) {
return null;
}
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos);
return convertSbr(sbr);
return convertSbr(truncate(sbr));
}
return null;
......@@ -353,6 +370,9 @@ public class DLegerCommitLog extends CommitLog {
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
......
......@@ -4,15 +4,14 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.dleger.DLegerServer;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
......@@ -25,7 +24,7 @@ import org.junit.Test;
public class DLegerCommitlogTest extends StoreTestBase {
private DefaultMessageStore createMessageStore(String base) throws Exception {
private DefaultMessageStore createMessageStore(String base, String selfId, String peers, String leaderId) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
......@@ -38,16 +37,21 @@ public class DLegerCommitlogTest extends StoreTestBase {
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(UUID.randomUUID().toString());
storeConfig.setdLegerPeers(String.format("n0-localhost:%d", nextPort()));
storeConfig.setdLegerSelfId("n0");
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), new MessageArrivingListener() {
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
}
}, new BrokerConfig());
if (leaderId != null) {
DLegerServer dLegerServer = ((DLegerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer();
dLegerServer.getdLegerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
} else {
dLegerServer.getMemberState().changeToFollower(-1, leaderId);
}
}
defaultMessageStore.load();
defaultMessageStore.start();
return defaultMessageStore;
......@@ -56,7 +60,8 @@ public class DLegerCommitlogTest extends StoreTestBase {
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
DefaultMessageStore messageStore = createMessageStore(base);
String peers = String.format("n0-localhost:%d", nextPort());
DefaultMessageStore messageStore = createMessageStore(base, "n0", peers, null);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
......@@ -91,4 +96,34 @@ public class DLegerCommitlogTest extends StoreTestBase {
}
@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), "n0", peers, "n0");
String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, putMessageResult.getPutMessageStatus());
Thread.sleep(1000);
Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createMessageStore(createBaseDir(), "n1", peers, "n0");
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0));
leaderStore.destroy();
followerStore.destroy();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册