提交 889d1339 编写于 作者: M Minghao Li

sync refactor

上级 ebeb4bb7
......@@ -47,6 +47,7 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry);
#ifdef __cplusplus
}
......
......@@ -108,4 +108,10 @@ void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) {
char* ss = syncEntry2Str(pEntry);
sTrace("%s | %s", s, ss);
free(ss);
}
\ No newline at end of file
......@@ -53,9 +53,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
walFsync(pWal, true);
int code;
code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
assert(code == 0);
walFsync(pWal, true);
free(serialized);
}
......@@ -84,23 +86,20 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
/*
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
SyncIndex lastIndex = pLastEntry->index;
free(pLastEntry);
*/
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int64_t last = walGetLastVer(pWal);
SyncIndex lastIndex = last < 0 ? 0 : last;
SyncIndex lastIndex = walGetLastVer(pWal);
return lastIndex;
}
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
SyncTerm lastTerm = pLastEntry->term;
free(pLastEntry);
if (pLastEntry != NULL) {
lastTerm = pLastEntry->term;
free(pLastEntry);
}
return lastTerm;
}
......@@ -121,8 +120,11 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = walGetLastVer(pWal);
SSyncRaftEntry* pEntry;
pEntry = logStoreGetEntry(pLogStore, lastIndex);
SSyncRaftEntry* pEntry = NULL;
if (lastIndex > 0) {
pEntry = logStoreGetEntry(pLogStore, lastIndex);
}
return pEntry;
}
......@@ -143,7 +145,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
for (SyncIndex i = 1; i <= lastIndex; ++i) {
for (SyncIndex i = 0; i <= lastIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
......@@ -164,6 +166,8 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
// for debug
void logStorePrint(SSyncLogStore* pLogStore) {
char* s = logStore2Str(pLogStore);
sTrace("%s", s);
// sTrace("%s", s);
fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s);
free(s);
}
\ No newline at end of file
......@@ -35,16 +35,19 @@ SSyncNode* syncNodeInit() {
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 100000;
walCfg.segSize = 100000;
walCfg.level = TAOS_WAL_WRITE;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./wal_test", &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
......@@ -80,8 +83,20 @@ SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint(pSyncNode->pLogStore);
for (int i = 0; i < 5; ++i) {
SSyncRaftEntry* pEntry;
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
//syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStorePrint(pSyncNode->pLogStore);
......@@ -117,16 +132,10 @@ int main(int argc, char** argv) {
pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
initRaftId(pSyncNode);
//--------------------------------------------------------------
//syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
//initRaftId(pSyncNode);
logStoreTest();
//--------------------------------------------------------------
// walClose(pWal);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册