提交 5009b782 编写于 作者: M Minghao Li

enh(sync): add log index manager

上级 fb4d3722
......@@ -212,13 +212,17 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index);
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode);
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
// for debug --------------
......
......@@ -1285,9 +1285,14 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
}
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(syncNodeHasSnapshot(pSyncNode));
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
ASSERT(index >= SYNC_INDEX_BEGIN);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
bool b = (index <= snapshot.lastApplyIndex);
return b;
}
......@@ -1303,18 +1308,33 @@ SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
}
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
}
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncTerm lastTerm = 0;
if (logLastIndex >= snapshot.lastApplyIndex) {
lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
}
if (pSyncNode->pLogStore->syncLogEntryCount(pSyncNode->pLogStore) > 0) {
// has log
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
if (logLastIndex > snapshot.lastApplyIndex) {
lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
} else {
lastTerm = snapshot.lastApplyTerm;
}
} else {
// no log
lastTerm = snapshot.lastApplyTerm;
}
} else {
lastTerm = snapshot.lastApplyTerm;
// no snapshot
lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
}
return lastTerm;
}
......@@ -1325,39 +1345,89 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy
return 0;
}
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
return syncStartIndex;
}
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex preIndex = index - 1;
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
ASSERT(index <= syncStartIndex);
SyncIndex preIndex;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
}
ASSERT(index > snapshot.lastApplyIndex);
preIndex = index - 1;
} else {
// no snapshot
preIndex = index - 1;
}
return preIndex;
}
SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm preTerm = 0;
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, index);
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
ASSERT(index <= syncStartIndex);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
if (syncNodeIsIndexInSnapshot(pSyncNode, preIndex) && preIndex == snapshot.lastApplyIndex) {
preTerm = snapshot.lastApplyTerm;
} else {
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
ASSERT(code == 0);
if (pPreEntry != NULL) {
SyncTerm preTerm = 0;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
}
ASSERT(index > snapshot.lastApplyIndex);
if (index > snapshot.lastApplyIndex + 1) {
// should be log preTerm
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
} else if (index == snapshot.lastApplyIndex + 1) {
preTerm = snapshot.lastApplyTerm;
} else {
ASSERT(0);
}
} else {
// no snapshot
ASSERT(index > SYNC_INDEX_BEGIN);
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
}
return preTerm;
}
// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
*pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
*pPreTerm = syncNodeGetPreITerm(pSyncNode, index);
*pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
return 0;
}
......
......@@ -437,11 +437,11 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
......
......@@ -34,7 +34,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void init() {
walInit();
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = 1000;
......
......@@ -22,9 +22,18 @@ SWal* pWal;
SSyncLogStore* pLogStore;
const char* pWalPath = "./syncLogStoreTest_wal";
SyncIndex gSnapshotLastApplyIndex;
SyncIndex gSnapshotLastApplyTerm;
int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm;
return 0;
}
void init() {
walInit();
taosRemoveDir(pWalPath);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
......@@ -41,6 +50,9 @@ void init() {
pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb;
}
void cleanup() {
......@@ -49,14 +61,56 @@ void cleanup() {
taosMemoryFree(pSyncNode);
}
void logStoreTest() {
void test1() {
// no snapshot
// no log
taosRemoveDir(pWalPath);
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore);
gSnapshotLastApplyIndex = -1;
gSnapshotLastApplyTerm = 0;
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex testIndex = 0;
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, testIndex);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, testIndex);
logStoreLog2((char*)"logStoreTest", pLogStore);
sTrace("test1");
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("%ld's preIndex: %ld", testIndex, preIndex);
sTrace("%ld's preTerm: %lu", testIndex, preTerm);
for (int i = 0; i < 5; ++i) {
logStoreDestory(pLogStore);
cleanup();
}
void test2() {
// no snapshot
// whole log
taosRemoveDir(pWalPath);
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
for (int i = 0; i <= 10; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
......@@ -65,34 +119,200 @@ void logStoreTest() {
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->getLastIndex(pLogStore) + 1;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
syncEntryLog2((char*)"==write entry== :", pEntry);
pLogStore->appendEntry(pLogStore, pEntry);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test2 after appendEntry", pLogStore);
gSnapshotLastApplyIndex = -1;
gSnapshotLastApplyTerm = 0;
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
if (i == 0) {
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN);
}
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
sTrace("test2");
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
for (SyncIndex i = 11; i >= 0; --i) {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("%ld's preIndex: %ld", i, preIndex);
sTrace("%ld's preTerm: %lu", i, preTerm);
}
logStoreLog2((char*)"after appendEntry", pLogStore);
pLogStore->truncate(pLogStore, 3);
logStoreLog2((char*)"after truncate 3", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
void test3() {
// has snapshot
// no log
taosRemoveDir(pWalPath);
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore);
gSnapshotLastApplyIndex = 5;
gSnapshotLastApplyTerm = 100;
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, 6);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, 6);
sTrace("test3");
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("%d's preIndex: %ld", 6, preIndex);
sTrace("%d's preTerm: %lu", 6, preTerm);
logStoreDestory(pLogStore);
cleanup();
}
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
void test4() {
// has snapshot
// whole log
taosRemoveDir(pWalPath);
init();
logStoreTest();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
taosMsleep(2000);
for (int i = 0; i <= 10; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test4 after appendEntry", pLogStore);
gSnapshotLastApplyIndex = 5;
gSnapshotLastApplyTerm = 100;
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
sTrace("test4");
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
for (SyncIndex i = 11; i >= 6; --i) {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("%ld's preIndex: %ld", i, preIndex);
sTrace("%ld's preTerm: %lu", i, preTerm);
}
logStoreDestory(pLogStore);
cleanup();
}
void test5() {
// has snapshot
// partial log
taosRemoveDir(pWalPath);
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
for (int i = 6; i <= 10; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test5 after appendEntry", pLogStore);
gSnapshotLastApplyIndex = 5;
gSnapshotLastApplyTerm = 100;
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
sTrace("test5");
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
for (SyncIndex i = 11; i >= 6; --i) {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("%ld's preIndex: %ld", i, preIndex);
sTrace("%ld's preTerm: %lu", i, preTerm);
}
logStoreDestory(pLogStore);
cleanup();
}
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
test1();
test2();
test3();
test4();
test5();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册