提交 ab2f2686 编写于 作者: S Shengliang Guan

Merge branch 'fix/tsim' into fix/dnode

...@@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai ...@@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai
Column information is stored using [ColumnMeta]. Column information is stored using [ColumnMeta].
``rust ```rust
let cols = &q.column_meta; let cols = &q.column_meta;
for col in cols { for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes);
......
...@@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
pMsg->msgType != TDMT_MND_TRANS_TIMER) { pMsg->msgType != TDMT_MND_TRANS_TIMER) {
mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
...@@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { ...@@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN; terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1; return -1;
} }
...@@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { ...@@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mTrace("msg:%p, won't response immediately since in progress", pMsg); mTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) { } else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg); mTrace("msg:%p, successfully processed", pMsg);
} else { } else {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
......
...@@ -670,7 +670,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -670,7 +670,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other mnodes", pTrans->id); mDebug("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......
...@@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del ...@@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t newDnodeId) { int32_t newDnodeId) {
mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId); mDebug("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica]; SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
pVgroup->replica++; pVgroup->replica++;
...@@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb ...@@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t delDnodeId) { int32_t delDnodeId) {
mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId); mDebug("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
SVnodeGid *pGid = NULL; SVnodeGid *pGid = NULL;
SVnodeGid delGid = {0}; SVnodeGid delGid = {0};
...@@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, ...@@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
memcpy(&newVg, pVgroup, sizeof(SVgObj)); memcpy(&newVg, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica); mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) { for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
syncStr(newVg.vnodeGid[i].role));
} }
if (pNew1 != NULL && pOld1 != NULL) { if (pNew1 != NULL && pOld1 != NULL) {
...@@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { ...@@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
......
...@@ -169,11 +169,12 @@ typedef struct SSdb { ...@@ -169,11 +169,12 @@ typedef struct SSdb {
SWal *pWal; SWal *pWal;
char *currDir; char *currDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t commitIndex;
int64_t lastCommitTerm; int64_t commitTerm;
int64_t curVer; int64_t commitConfig;
int64_t curTerm; int64_t applyIndex;
int64_t curConfig; int64_t applyTerm;
int64_t applyConfig;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
......
...@@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
pSdb->pWal = pOption->pWal; pSdb->pWal = pOption->pWal;
pSdb->curVer = -1; pSdb->applyIndex = -1;
pSdb->curTerm = -1; pSdb->applyTerm = -1;
pSdb->lastCommitVer = -1; pSdb->applyConfig = -1;
pSdb->lastCommitTerm = -1; pSdb->commitIndex = -1;
pSdb->curConfig = -1; pSdb->commitTerm = -1;
pSdb->commitConfig = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL); taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully"); mDebug("sdb init successfully");
...@@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) { ...@@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0; return 0;
} }
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) { void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
if (pSdb->curConfig != config) { if (pSdb->applyConfig != config) {
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config);
pSdb->curConfig = config; pSdb->applyConfig = config;
} }
} }
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; }
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; } int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; }
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; } int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; }
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; } int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; }
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->curConfig; } int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; }
\ No newline at end of file \ No newline at end of file
...@@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) { ...@@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) {
mDebug("sdb:%s is reset", sdbTableName(i)); mDebug("sdb:%s is reset", sdbTableName(i));
} }
pSdb->curVer = -1; pSdb->applyIndex = -1;
pSdb->curTerm = -1; pSdb->applyTerm = -1;
pSdb->lastCommitVer = -1; pSdb->applyConfig = -1;
pSdb->lastCommitTerm = -1; pSdb->commitIndex = -1;
pSdb->commitTerm = -1;
pSdb->commitConfig = -1;
mDebug("sdb reset successfully"); mDebug("sdb reset successfully");
} }
...@@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
} }
code = 0; code = 0;
pSdb->lastCommitVer = pSdb->curVer; pSdb->commitIndex = pSdb->applyIndex;
pSdb->lastCommitTerm = pSdb->curTerm; pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitConfig = pSdb->applyConfig;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer, mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
pSdb->lastCommitTerm, pSdb->curConfig); pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
_OVER: _OVER:
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64 mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
" file:%s", " term:%" PRId64 " config:%" PRId64 ", file:%s",
pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile); pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
curfile);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
if (code != 0) { if (code != 0) {
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} else { } else {
pSdb->lastCommitVer = pSdb->curVer; pSdb->commitIndex = pSdb->applyIndex;
pSdb->lastCommitTerm = pSdb->curTerm; pSdb->commitTerm = pSdb->applyTerm;
mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pSdb->commitConfig = pSdb->applyConfig;
pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile); mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
} }
terrno = code; terrno = code;
...@@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
int32_t sdbWriteFile(SSdb *pSdb) { int32_t sdbWriteFile(SSdb *pSdb) {
int32_t code = 0; int32_t code = 0;
if (pSdb->curVer == pSdb->lastCommitVer) { if (pSdb->applyIndex == pSdb->commitIndex) {
return 0; return 0;
} }
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
code = walBeginSnapshot(pSdb->pWal, pSdb->curVer); code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
} }
if (code == 0) { if (code == 0) {
code = sdbWriteFileImp(pSdb); code = sdbWriteFileImp(pSdb);
...@@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
int64_t commitIndex = pSdb->lastCommitVer; int64_t commitIndex = pSdb->commitIndex;
int64_t commitTerm = pSdb->lastCommitTerm; int64_t commitTerm = pSdb->commitTerm;
int64_t curConfig = pSdb->curConfig; int64_t commitConfig = pSdb->commitConfig;
if (taosCopyFile(datafile, pIter->name) < 0) { if (taosCopyFile(datafile, pIter->name) < 0) {
taosThreadMutexUnlock(&pSdb->filelock); taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
} }
*ppIter = pIter; *ppIter = pIter;
mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter, mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
commitIndex, commitTerm, curConfig, pIter->name); pIter, commitIndex, commitTerm, commitConfig, pIter->name);
return 0; return 0;
} }
......
...@@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
void * tagData = NULL; void * tagData = NULL;
if (param->val == NULL) { if (param->val == NULL) {
metaError("vgId:%d failed to filter NULL data", TD_VID(pMeta->pVnode)); metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
return -1; return -1;
} else { } else {
if (IS_VAR_DATA_TYPE(param->type)) { if (IS_VAR_DATA_TYPE(param->type)) {
......
...@@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) { ...@@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) {
return -1; return -1;
} }
vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
if (syncLeaderTransfer(pVnode->sync) != 0) { if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr()); vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
return -1; return -1;
...@@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
ret = -1; ret = -1;
} }
if (ret != 0) { if (ret != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }
return ret; return ret;
......
...@@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
...@@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
...@@ -2288,7 +2279,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2288,7 +2279,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders; pInfo->dataReaders = dataReaders;
...@@ -2307,16 +2297,21 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2307,16 +2297,21 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
taosArrayPush(pInfo->sortSourceParams, param); taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param); taosMemoryFree(param);
} }
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize);
pInfo->sortBufSize = pInfo->bufPageSize * 16; // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false; pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL; pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator"; pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
......
...@@ -717,7 +717,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -717,7 +717,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
...@@ -1067,7 +1067,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1067,7 +1067,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ths->commitIndex = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd); commitBegin, commitEnd);
} }
......
...@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender); char* s = snapshotSender2Str(pSender);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu "
...@@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
taosMemoryFree(s); taosMemoryFree(s);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
......
...@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
} }
......
...@@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { ...@@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex); sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0; return 0;
...@@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct ...@@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
} }
} }
sMeta->lastConfigIndex = lastIndex; sMeta->lastConfigIndex = lastIndex;
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex); sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0; return 0;
...@@ -613,7 +614,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -613,7 +614,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType); TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak); ret = syncNodePropose(pSyncNode, pMsg, isWeak);
...@@ -624,7 +625,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -624,7 +625,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType); TMSG_INFO(pMsg->msgType), pMsg->msgType);
...@@ -872,7 +873,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -872,7 +873,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta // snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1; // pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
return pSyncNode; return pSyncNode;
...@@ -920,7 +921,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -920,7 +921,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
int32_t ret; int32_t ret;
...@@ -1309,7 +1310,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -1309,7 +1310,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256; int len = 256;
char* s = (char*)taosMemoryMalloc(len); char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len, snprintf(s, len,
"syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " "syncNode: vgId:%d, currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"electTimerLogicClock:%lu, " "electTimerLogicClock:%lu, "
"electTimerLogicClockUser:%lu, " "electTimerLogicClockUser:%lu, "
"electTimerMS:%d, replicaNum:%d", "electTimerMS:%d, replicaNum:%d",
...@@ -1360,7 +1361,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1360,7 +1361,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i]; oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) { if (gRaftDetailLog) {
...@@ -1415,7 +1416,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1415,7 +1416,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
uint16_t port; uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu", "privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
...@@ -1428,7 +1429,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1428,7 +1429,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i]->replicaIndex = i;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d", "reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
...@@ -1441,7 +1442,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1441,7 +1442,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
} }
...@@ -1451,7 +1452,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1451,7 +1452,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) { if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]); snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL; oldSenders[i] = NULL;
...@@ -1524,7 +1525,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { ...@@ -1524,7 +1525,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s", "restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
...@@ -1566,7 +1567,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1566,7 +1567,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s", "restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
...@@ -1872,7 +1873,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1872,7 +1873,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
return; return;
...@@ -1906,7 +1907,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1906,7 +1907,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
return; return;
...@@ -1944,7 +1945,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1944,7 +1945,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
return; return;
...@@ -2146,12 +2147,12 @@ const char* syncStr(ESyncState state) { ...@@ -2146,12 +2147,12 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr); pSyncLeaderTransfer->newLeaderId.addr);
...@@ -2294,7 +2295,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2294,7 +2295,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
", %s", ", %s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state)); endIndex, syncUtilState2String(state));
...@@ -2349,7 +2350,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2349,7 +2350,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
syncUtilState2String(ths->state), pEntry->index); syncUtilState2String(ths->state), pEntry->index);
} }
......
...@@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true); walFsync(pWal, true);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d", "originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
...@@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true); walFsync(pWal, true);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d", "originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
......
...@@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { ...@@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
...@@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { ...@@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p", "ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
...@@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu ...@@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p", "ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
......
...@@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
...@@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
...@@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
...@@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
...@@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
} }
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
...@@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send " "privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
...@@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu", "privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
...@@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
...@@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
...@@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop; bool isDrop;
if (IamInNew) { if (IamInNew) {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
...@@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, " "newCfg, "
"lastIndex:%ld, lastTerm:%lu, " "lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
...@@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, " "index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
...@@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(logSimpleStr); taosMemoryFree(logSimpleStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, " "index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
...@@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, " "lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
...@@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, " "lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
...@@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv " "lastConfigIndex:%ld, privateTerm:%lu, recv "
...@@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
...@@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
...@@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, " "lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
......
...@@ -295,8 +295,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) { ...@@ -295,8 +295,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
void dumpHeader(SSdb *pSdb, SJson *json) { void dumpHeader(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(json, "sver", 1); tjsonAddIntegerToObject(json, "sver", 1);
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer)); tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex));
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm)); tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm));
tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig));
SJson *maxIdsJson = tjsonCreateObject(); SJson *maxIdsJson = tjsonCreateObject();
tjsonAddItemToObject(json, "maxIds", maxIdsJson); tjsonAddItemToObject(json, "maxIds", maxIdsJson);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册