提交 f5db109e 编写于 作者: M Minghao Li

fix(sync): get last max snapshot index

上级 d71027ac
...@@ -192,6 +192,7 @@ bool syncEnvIsStart(); ...@@ -192,6 +192,7 @@ bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
......
...@@ -69,7 +69,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -69,7 +69,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) { if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
SSnapshotMeta sMeta = {0}; SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
} }
sdbWriteFile(pMnode->pSdb); sdbWriteFile(pMnode->pSdb);
...@@ -89,7 +90,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { ...@@ -89,7 +90,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSnapshotMeta sMeta = {0}; SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
} }
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndUser.h" #include "mndUser.h"
#define TRANS_VER_NUMBER 1 #define TRANS_VER_NUMBER 1
#define TRANS_ARRAY_SIZE 8 #define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 64 #define TRANS_RESERVE_SIZE 64
static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRaw *mndTransActionEncode(STrans *pTrans);
...@@ -1426,7 +1426,9 @@ void mndTransPullup(SMnode *pMnode) { ...@@ -1426,7 +1426,9 @@ void mndTransPullup(SMnode *pMnode) {
} }
SSnapshotMeta sMeta = {0}; SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
} }
sdbWriteFile(pMnode->pSdb); sdbWriteFile(pMnode->pSdb);
......
...@@ -29,6 +29,8 @@ extern "C" { ...@@ -29,6 +29,8 @@ extern "C" {
#define CONFIG_FILE_LEN 1024 #define CONFIG_FILE_LEN 1024
#define MAX_CONFIG_INDEX_COUNT 512
typedef struct SRaftCfg { typedef struct SRaftCfg {
SSyncCfg cfg; SSyncCfg cfg;
TdFilePtr pFile; TdFilePtr pFile;
...@@ -36,6 +38,10 @@ typedef struct SRaftCfg { ...@@ -36,6 +38,10 @@ typedef struct SRaftCfg {
int8_t isStandBy; int8_t isStandBy;
int8_t snapshotEnable; int8_t snapshotEnable;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
int32_t configIndexCount;
} SRaftCfg; } SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path); SRaftCfg *raftCfgOpen(const char *path);
......
...@@ -418,6 +418,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { ...@@ -418,6 +418,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
return 0; return 0;
} }
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
const char* syncGetMyRoleStr(int64_t rid) { const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid)); const char* s = syncUtilState2String(syncGetMyRole(rid));
return s; return s;
...@@ -2195,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2195,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
char* newStr = syncCfg2Str(&newSyncCfg); char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr); syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr); syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum, snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr); newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr); taosMemoryFree(oldStr);
taosMemoryFree(newStr); taosMemoryFree(newStr);
...@@ -2212,8 +2235,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2212,8 +2235,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
char* newStr = syncCfg2Str(&newSyncCfg); char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr); syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr); syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr); newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr); taosMemoryFree(oldStr);
taosMemoryFree(newStr); taosMemoryFree(newStr);
...@@ -2295,6 +2318,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2295,6 +2318,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
} }
// restore finish // restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) { if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) { if (ths->pFsm->FpRestoreFinishCb != NULL) {
......
...@@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { ...@@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
} }
return pRoot; return pRoot;
/*
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
return pJson;
*/
} }
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64); cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
cJSON *pIndexArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
cJSON *pIndexObj = cJSON_CreateObject();
cJSON_AddStringToObject(pIndexObj, "index", buf64);
cJSON_AddItemToArray(pIndexArr, pIndexObj);
}
cJSON *pJson = cJSON_CreateObject(); cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
return pJson; return pJson;
...@@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ...@@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
raftCfg.isStandBy = meta.isStandBy; raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable; raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.lastConfigIndex = meta.lastConfigIndex; raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
raftCfg.configIndexArr[0] = -1;
char *s = raftCfg2Str(&raftCfg); char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN] = {0}; char buf[CONFIG_FILE_LEN] = {0};
...@@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex"); cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex)); pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
int arraySize = cJSON_GetArraySize(pIndexArr);
assert(arraySize == pRaftCfg->configIndexCount);
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
for (int i = 0; i < arraySize; ++i) {
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
assert(pIndexObj != NULL);
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
assert(cJSON_IsString(pIndex));
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);
......
...@@ -410,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { ...@@ -410,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
} }
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
/* SSyncLogStoreData* pData = pLogStore->data;
SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal;
SWal* pWal = pData->pWal; // assert(walCommit(pWal, index) == 0);
// assert(walCommit(pWal, index) == 0); int32_t code = walCommit(pWal, index);
int32_t code = walCommit(pWal, index); if (code != 0) {
if (code != 0) { int32_t err = terrno;
int32_t err = terrno; const char* errStr = tstrerror(err);
const char* errStr = tstrerror(err); int32_t linuxErr = errno;
int32_t linuxErr = errno; const char* linuxErrMsg = strerror(errno);
const char* linuxErrMsg = strerror(errno); sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, ASSERT(0);
linuxErrMsg); ASSERT(0); }
}
*/
return 0; return 0;
} }
......
...@@ -421,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -421,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -542,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -542,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId; cJSON * pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
...@@ -566,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -566,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
char *newStr = syncCfg2Str(&newSyncCfg); char *newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr); syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr); syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
newSyncCfg.replicaNum, oldStr, newStr); oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
taosMemoryFree(oldStr); taosMemoryFree(oldStr);
taosMemoryFree(newStr); taosMemoryFree(newStr);
......
...@@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() { ...@@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() {
} }
pCfg->isStandBy = taosGetTimestampSec() % 100; pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
return pCfg; return pCfg;
} }
...@@ -100,6 +108,15 @@ void test5() { ...@@ -100,6 +108,15 @@ void test5() {
pCfg->isStandBy += 2; pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3; pCfg->snapshotEnable += 3;
pCfg->lastConfigIndex += 1000; pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
raftCfgPersist(pCfg); raftCfgPersist(pCfg);
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
...@@ -118,6 +135,6 @@ int main() { ...@@ -118,6 +135,6 @@ int main() {
test3(); test3();
test4(); test4();
test5(); test5();
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册