提交 2adc0b8a 编写于 作者: M Minghao Li

enh(sync): add log index manager

上级 5908631e
......@@ -149,6 +149,7 @@ typedef struct SSyncLogStore {
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
......
......@@ -20,6 +20,7 @@
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
......@@ -70,7 +71,7 @@ static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
int32_t count = endIndex - beginIndex;
int32_t count = endIndex - beginIndex + 1;
return count > 0 ? count : 0;
}
......@@ -85,22 +86,48 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
}
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SyncIndex lastIndex;
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetLastVer(pWal);
return lastVer;
SyncIndex firstVer = walGetFirstVer(pWal);
if (lastVer < firstVer) {
// no record
lastIndex = -1;
} else {
if (firstVer >= 0) {
lastIndex = lastVer;
} else if (firstVer == -1) {
lastIndex = -1;
} else {
ASSERT(0);
}
}
return lastIndex;
}
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetLastVer(pWal);
return lastVer + 1;
}
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
if (raftLogEntryCount == 0) {
if (raftLogEntryCount(pLogStore) == 0) {
lastTerm = 0;
} else {
SSyncRaftEntry* pLastEntry;
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
ASSERT(code == 0);
lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
if (pLastEntry != NULL) {
lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
}
}
return lastTerm;
}
......@@ -109,8 +136,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastIndex + 1);
SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
ASSERT(pEntry->index == writeIndex);
int code = 0;
SSyncLogMeta syncMeta;
......@@ -194,8 +221,9 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
}
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
*ppLastEntry = NULL;
if (raftLogEntryCount(pLogStore) == 0) {
return -1;
return 0;
}
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
......@@ -243,6 +271,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
pLogStore->syncLogGetEntry = raftLogGetEntry;
pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
return pLogStore;
}
......@@ -401,35 +430,29 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
for (SyncIndex i = pData->beginIndex; i <= lastIndex; ++i) {
for (SyncIndex i = pData->beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
/*
for (SyncIndex i = 0; i <= lastIndex; ++i) {
SyncIndex walFirstVer = walGetFirstVer(pData->pWal);
if (i != SYNC_INDEX_INVALID && i >= walFirstVer) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
}
*/
}
cJSON* pJson = cJSON_CreateObject();
......
......@@ -22,6 +22,16 @@ SWal* pWal;
SSyncLogStore* pLogStore;
const char* pWalPath = "./syncLogStoreTest_wal";
SyncIndex gSnapshotLastApplyIndex;
SyncIndex gSnapshotLastApplyTerm;
int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm;
return 0;
}
void init() {
walInit();
taosRemoveDir(pWalPath);
......@@ -41,6 +51,9 @@ void init() {
pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb;
}
void cleanup() {
......@@ -49,14 +62,37 @@ void cleanup() {
taosMemoryFree(pSyncNode);
}
void logStoreTest() {
void test1() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
void test2() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
void test3() {
init();
logStoreLog2((char*)"logStoreTest", pLogStore);
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore);
for (int i = 0; i < 5; ++i) {
for (int i = 0; i <= 4; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
......@@ -65,34 +101,123 @@ void logStoreTest() {
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->getLastIndex(pLogStore) + 1;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
syncEntryLog2((char*)"==write entry== :", pEntry);
pLogStore->appendEntry(pLogStore, pEntry);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
if (i == 0) {
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN);
}
}
logStoreLog2((char*)"after appendEntry", pLogStore);
logStoreLog2((char*)"test3 after appendEntry", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
void test4() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->truncate(pLogStore, 3);
logStoreLog2((char*)"after truncate 3", pLogStore);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test4 after appendEntry", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
void test5() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test5 after appendEntry", pLogStore);
pLogStore->syncLogTruncate(pLogStore, 7);
logStoreLog2((char*)"after truncate 7", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
void test6() {
init();
logStoreTest();
taosMsleep(2000);
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100 + i;
pEntry->index = pLogStore->syncLogWriteIndex(pLogStore);
snprintf(pEntry->data, dataLen, "value%d", i);
pLogStore->syncLogAppendEntry(pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStoreLog2((char*)"test6 after appendEntry", pLogStore);
pLogStore->syncLogTruncate(pLogStore, 5);
logStoreLog2((char*)"after truncate 5", pLogStore);
logStoreDestory(pLogStore);
cleanup();
}
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
test1();
test2();
test3();
test4();
test5();
test6();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册