diff --git a/docs/en/14-reference/03-connector/rust.mdx b/docs/en/14-reference/03-connector/rust.mdx index a5cbaeac8077cda42690d9cc232062a685a51f41..56ca586c7e8ada6e4422596906e01887d4726fd0 100644 --- a/docs/en/14-reference/03-connector/rust.mdx +++ b/docs/en/14-reference/03-connector/rust.mdx @@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai Column information is stored using [ColumnMeta]. - ``rust + ```rust let cols = &q.column_meta; for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7b5025a986397a5124546b5210f7196740dc763e..bc418e0f534a1e8183c43ad76e5cc4c4dd222299 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && pMsg->msgType != TDMT_MND_TRANS_TIMER) { - mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, - TMSG_INFO(pMsg->msgType)); + mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); @@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; - mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, + pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; return -1; } @@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mTrace("msg:%p, won't response immediately since in progress", pMsg); } else if (code == 0) { - mTrace("msg:%p, successfully processed and response", pMsg); + mTrace("msg:%p, successfully processed", pMsg); } else { mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 4ffa601889b25a9e1a1ce2f71acf95c5f1b48664..0cd1408b4ab82e242f724e8c3ead4b9a9bcd16fb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -670,7 +670,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("trans:%d, sync to other mnodes", pTrans->id); + mDebug("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage)); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index cec83d1af5919984bfb29197efd37200614130ea..94ddbcd409c8ef4b1ad6bf8d97bda63edb63ea92 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t newDnodeId) { - mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId); + mDebug("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId); SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica]; pVgroup->replica++; @@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t delDnodeId) { - mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId); + mDebug("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId); SVnodeGid *pGid = NULL; SVnodeGid delGid = {0}; @@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, memcpy(&newVg, pVgroup, sizeof(SVgObj)); mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica); for (int32_t i = 0; i < newVg.replica; ++i) { - mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); + mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId, + syncStr(newVg.vnodeGid[i].role)); } if (pNew1 != NULL && pOld1 != NULL) { @@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { goto _OVER; } - mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); + mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 8536c451b7d9c33bfc9fc9abfbe499af60718cde..ad1bf584d0ad9d1b64b0405edcf2abcaf7a7dacd 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -169,11 +169,12 @@ typedef struct SSdb { SWal *pWal; char *currDir; char *tmpDir; - int64_t lastCommitVer; - int64_t lastCommitTerm; - int64_t curVer; - int64_t curTerm; - int64_t curConfig; + int64_t commitIndex; + int64_t commitTerm; + int64_t commitConfig; + int64_t applyIndex; + int64_t applyTerm; + int64_t applyConfig; int64_t tableVer[SDB_MAX]; int64_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 39e9c7588833c0185129ef0dea51f8bc8d665df4..61809aa93b073276315d8ab4ded34438f540223c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) { } pSdb->pWal = pOption->pWal; - pSdb->curVer = -1; - pSdb->curTerm = -1; - pSdb->lastCommitVer = -1; - pSdb->lastCommitTerm = -1; - pSdb->curConfig = -1; + pSdb->applyIndex = -1; + pSdb->applyTerm = -1; + pSdb->applyConfig = -1; + pSdb->commitIndex = -1; + pSdb->commitTerm = -1; + pSdb->commitConfig = -1; pSdb->pMnode = pOption->pMnode; taosThreadMutexInit(&pSdb->filelock, NULL); mDebug("sdb init successfully"); @@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } -void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } +void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; } -void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } +void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; } void sdbSetCurConfig(SSdb *pSdb, int64_t config) { - if (pSdb->curConfig != config) { - mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); - pSdb->curConfig = config; + if (pSdb->applyConfig != config) { + mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config); + pSdb->applyConfig = config; } } -int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } +int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; } -int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; } +int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; } -int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; } +int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; } -int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; } +int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; } -int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->curConfig; } \ No newline at end of file +int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 34f5d6f23d541710cbc475f433d1c8d5ff8b4c3d..2e8e9325728c9f846cbf452c01a75e096b0e3faa 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) { mDebug("sdb:%s is reset", sdbTableName(i)); } - pSdb->curVer = -1; - pSdb->curTerm = -1; - pSdb->lastCommitVer = -1; - pSdb->lastCommitTerm = -1; + pSdb->applyIndex = -1; + pSdb->applyTerm = -1; + pSdb->applyConfig = -1; + pSdb->commitIndex = -1; + pSdb->commitTerm = -1; + pSdb->commitConfig = -1; mDebug("sdb reset successfully"); } @@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } - ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t)); + ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t)); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } - ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t)); + ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t)); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } - ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t)); + ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t)); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } - if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) { + if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) { + if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) { + if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { } code = 0; - pSdb->lastCommitVer = pSdb->curVer; - pSdb->lastCommitTerm = pSdb->curTerm; + pSdb->commitIndex = pSdb->applyIndex; + pSdb->commitTerm = pSdb->applyTerm; + pSdb->commitConfig = pSdb->applyConfig; memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); - mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer, - pSdb->lastCommitTerm, pSdb->curConfig); + mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, + pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig); _OVER: taosCloseFile(&pFile); @@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { char curfile[PATH_MAX] = {0}; snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64 - " file:%s", - pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile); + mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64 + " term:%" PRId64 " config:%" PRId64 ", file:%s", + pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, + curfile); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { if (code != 0) { mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } else { - pSdb->lastCommitVer = pSdb->curVer; - pSdb->lastCommitTerm = pSdb->curTerm; - mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", - pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile); + pSdb->commitIndex = pSdb->applyIndex; + pSdb->commitTerm = pSdb->applyTerm; + pSdb->commitConfig = pSdb->applyConfig; + mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", + pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile); } terrno = code; @@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { int32_t sdbWriteFile(SSdb *pSdb) { int32_t code = 0; - if (pSdb->curVer == pSdb->lastCommitVer) { + if (pSdb->applyIndex == pSdb->commitIndex) { return 0; } taosThreadMutexLock(&pSdb->filelock); if (pSdb->pWal != NULL) { - code = walBeginSnapshot(pSdb->pWal, pSdb->curVer); + code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex); } if (code == 0) { code = sdbWriteFileImp(pSdb); @@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); taosThreadMutexLock(&pSdb->filelock); - int64_t commitIndex = pSdb->lastCommitVer; - int64_t commitTerm = pSdb->lastCommitTerm; - int64_t curConfig = pSdb->curConfig; + int64_t commitIndex = pSdb->commitIndex; + int64_t commitTerm = pSdb->commitTerm; + int64_t commitConfig = pSdb->commitConfig; if (taosCopyFile(datafile, pIter->name) < 0) { taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); @@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { } *ppIter = pIter; - mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter, - commitIndex, commitTerm, curConfig, pIter->name); + mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", + pIter, commitIndex, commitTerm, commitConfig, pIter->name); return 0; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index a6339125c42e873a62b231ab3932736023f2204f..8cfebe1b98c8e81f462f2243f9c9a9eebadb78a8 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { void * tagData = NULL; if (param->val == NULL) { - metaError("vgId:%d failed to filter NULL data", TD_VID(pMeta->pVnode)); + metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); return -1; } else { if (IS_VAR_DATA_TYPE(param->type)) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f46114746e76bcd7b745037c1339e459e7054371..fded723f1aaac5a38632fa71d5a630eb35ea37b1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) { return -1; } + vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode)); if (syncLeaderTransfer(pVnode->sync) != 0) { vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr()); return -1; @@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ret = -1; } - if (ret != 0) { + if (ret != 0 && terrno == 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return ret; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a8aad8fe688fc4b363674a1dd35c58ecddfe98c6..80276007de2720ce2354f79f23d2a8066e2cb1b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - } - + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { break; } appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { break; } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); return (p->info.rows > 0) ? p : NULL; } @@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -2284,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; - pInfo->sample.seed = taosGetTimestampSec(); - - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReaders = dataReaders; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; - pInfo->curTWinIdx = 0; + pInfo->sample.seed = taosGetTimestampSec(); + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; pInfo->pResBlock = createResDataBlock(pDescNode); @@ -2307,22 +2297,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN taosArrayPush(pInfo->sortSourceParams, param); taosMemoryFree(param); } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; - int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols); - pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize); - pInfo->sortBufSize = pInfo->bufPageSize * 16; - pInfo->hasGroupId = false; + pInfo->bufPageSize = getProperSortPageSize(rowSize); + + // todo the total available buffer should be determined by total capacity of buffer of this task. + // the additional one is reserved for merge result + pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1); + pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; - pOperator->name = "TableMergeScanOperator"; + pOperator->name = "TableMergeScanOperator"; + // TODO : change it pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 1024); pOperator->fpSet = diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 16350a8f40aea545d192f58cdd5035c77c204390..2c17d0f3edbc03b3040d5b290d336d9b2e7e2269 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -717,7 +717,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); @@ -1067,7 +1067,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ths->commitIndex = snapshot.lastApplyIndex; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, commitBegin, commitEnd); } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f934f9a268d68635e0ca51bdeb49a6904856aa22..eabd14bf28891f51360499476dd041810069b372 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " @@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index d010728c78210c3c81d1b40ad4dac3adfd707f38..d563e5aacccb8156044ee4da8f9ea6bfe03b5e1d 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3b0fe28fad930496283aa2c9da282fbe1a5dbea5..5abe98b425117368f2c53630bf73c5c3329dc9ec 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { assert(rid == pSyncNode->rid); sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; - sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex); + sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; @@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct } } sMeta->lastConfigIndex = lastIndex; - sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex); + sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex, + sMeta->lastConfigIndex); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; @@ -613,7 +614,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); @@ -624,7 +625,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); @@ -872,7 +873,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); return pSyncNode; @@ -920,7 +921,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); int32_t ret; @@ -1309,7 +1310,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "syncNode: vgId:%d, currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d, replicaNum:%d", @@ -1360,7 +1361,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { @@ -1415,7 +1416,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " "privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], @@ -1428,7 +1429,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " "reset:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); @@ -1441,7 +1442,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } @@ -1451,7 +1452,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; @@ -1524,7 +1525,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, @@ -1566,7 +1567,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->restoreFinish = false; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, @@ -1872,7 +1873,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -1906,7 +1907,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -1944,7 +1945,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -2146,12 +2147,12 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); @@ -2294,7 +2295,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); @@ -2349,7 +2350,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 87dfef5fdd1181b232ef7b0f51292f07c70378d5..8fff7ae6deb4a23e0c88f068632435bca89b6216 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, @@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 551c2611b06cc8a93cb5bad09b23ccbd5687fb44..831e286d76e8a55e9101b67cdf134d7cb5585552 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); @@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " "ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, @@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " "ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 4973d3629571284bb8ef5fc67aa1041e88c268f3..7c8abfe494901972b92eccadc9b14f7193259294 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " @@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " @@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { } } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "privateTerm:%lu send " "msg:%s", pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, @@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "privateTerm:%lu", pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); @@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", @@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", @@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld ", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " "newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", @@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", @@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", @@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " @@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", @@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", @@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 0f0f7e8d101036c686dc413e59e76fdca7c6124e..612b870b7e0590713c373379b0c0053abeeeef02 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -295,8 +295,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) { void dumpHeader(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(json, "sver", 1); - tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer)); - tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm)); + tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex)); + tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm)); + tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig)); SJson *maxIdsJson = tjsonCreateObject(); tjsonAddItemToObject(json, "maxIds", maxIdsJson);