diff --git a/docs/en/14-reference/03-connector/rust.mdx b/docs/en/14-reference/03-connector/rust.mdx index a5cbaeac8077cda42690d9cc232062a685a51f41..56ca586c7e8ada6e4422596906e01887d4726fd0 100644 --- a/docs/en/14-reference/03-connector/rust.mdx +++ b/docs/en/14-reference/03-connector/rust.mdx @@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai Column information is stored using [ColumnMeta]. - ``rust + ```rust let cols = &q.column_meta; for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index b7124dfaa51a247f0af296e48bed300a38e20ba3..012e61d23964300df42b79fd52ed48c3af6ee4ce 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -148,6 +148,10 @@ static int32_t mmStart(SMnodeMgmt *pMgmt) { static void mmStop(SMnodeMgmt *pMgmt) { dDebug("mnode-mgmt start to stop"); + taosThreadRwlockWrlock(&pMgmt->lock); + pMgmt->stopped = 1; + taosThreadRwlockUnlock(&pMgmt->lock); + mndStop(pMgmt->pMnode); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 7cd7da1aa93825f5e94484a8def66821511c4a0c..493f4ab85f379ad7b40b85529b26a9444b7fbd5e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -220,9 +220,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - taosThreadRwlockWrlock(&pMgmt->lock); - pMgmt->stopped = 1; - taosThreadRwlockUnlock(&pMgmt->lock); while (pMgmt->refCount > 0) taosMsleep(10); tSingleWorkerCleanup(&pMgmt->monitorWorker); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9946185da6d9a65642dbf725a9456dfb03acbacc..f112727c0863849119468708af1ce2d2ffbf605c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmReleaseVnode(pMgmt, pVnode); terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; code = terrno; - goto _OVER; + return 0; } snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f277a5169d81007f0ad1d2fb60d05193ed24d7bb..7b5025a986397a5124546b5210f7196740dc763e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { + if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index e82c479cdd416ea87fdfc10cf506a3f33e2aec72..a636f34f3d199c672bf4a6be9cdbbe8151a1d036 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { pMgmt->errCode = 0; pMgmt->transId = -1; tsem_wait(&pMgmt->syncSem); - mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); + mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode)); terrno = pMgmt->errCode; return pMgmt->errCode; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5d28181f00454a6febfad05e9615ea2c14177cce..ee5a291fbb31255a2bba8805a74c583362f5eb2e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -59,6 +59,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); } + pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } else { STrans *pTrans = mndAcquireTrans(pMnode, transId); @@ -122,6 +123,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); } + pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } } @@ -258,13 +260,17 @@ void mndSyncStart(SMnode *pMnode) { mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); } -void mndSyncStop(SMnode *pMnode) {} +void mndSyncStop(SMnode *pMnode) { + if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; + tsem_post(&pMnode->syncMgmt.syncSem); + } +} bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - ESyncState state = syncGetMyRole(pMgmt->sync); - if (state != TAOS_SYNC_STATE_LEADER) { + if (!syncIsReady(pMgmt->sync)) { terrno = TSDB_CODE_SYN_NOT_LEADER; return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d170931e7ea66850b665719454e2f9068c4e5e8b..da1a126a765289fa8520df33519ecd57d3f7c1d9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe } // process request - // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { - // rcode = terrno; - // goto _exit; - // } + if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { + rcode = terrno; + goto _exit; + } // return rsp _exit: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 73291a11232ee062060e8c67b0a22772bd293823..c3cd75ebae29aec78053c96d2fac25d7fac735be 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) // check for the limitation in each group if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); - - if (pProjectInfo->slimit.limit == -1 || pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { + if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { pOperator->status = OP_EXEC_DONE; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 23226a01347b39b152b1787d0e5c068821fa1673..4fc25688c467effa96da8035a5225be07ce39198 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pInfo->binfo.resultRowInfo); #if 0 if(pOperator->fpSet.encodeResultRow){ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9c69b739b7608d58e07acd4a5ede19e1477c9641..be186c46f399cc1631392973a1f14402a09516da 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2309,14 +2309,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; - pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; + int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols); + pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize); pInfo->sortBufSize = pInfo->bufPageSize * 16; pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; pOperator->name = "TableMergeScanOperator"; - // TODO : change it - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 90700580bd71956239a083e990ad8de7c4bb85bb..55900c7c7ecbd5cdfd3449636547a040ea7d6d7d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, - int32_t rowIndex); +static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); + +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (pEntryInfo->numOfRes > 0) { setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + } else { + setNullSelectivityValue(pCtx, pBlock, currentRow); } return pEntryInfo->numOfRes; } +void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colDataAppendNULL(pDstCol, rowIndex); + } +} + void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { int32_t pageId = pTuplePos->pageId; int32_t offset = pTuplePos->offset; @@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { STailItem* pItem = pInfo->pItems[i]; colDataAppend(pCol, currentRow, pItem->data, false); - - // setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow); currentRow += 1; } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 89b4761212bd4ea50451fed0821adb862df1dc98..7b2f79e24c5b4dbc7ba4c7a757716dd231928b22 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm, - delBegin, delEnd); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } } while (0); + // fake match2 + // + // condition1: + // preIndex <= my commit index + // + // operation: + // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry + // match my-commit-index or my-commit-index + 1 + // no operation on log + do { + bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && + (pMsg->prevLogIndex <= ths->commitIndex); + if (condition) { + sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex, + ths->commitIndex); + + SyncIndex matchIndex = ths->commitIndex; + bool hasAppendEntries = pMsg->dataLen > 0; + if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { + // append entry + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(pAppendEntry != NULL); + + { + // has extra entries (> preIndex) in local log + SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; + + if (hasExtraEntries) { + // make log same, rollback deleted entries + code = syncNodeMakeLogSame(ths, pMsg); + ASSERT(code == 0); + } + } + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); + + // pre commit + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + + matchIndex = pMsg->prevLogIndex + 1; + + syncEntryDestory(pAppendEntry); + } + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; + pReply->matchIndex = matchIndex; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + } while (0); + // calculate logOK here, before will coredump, due to fake match bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); @@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, - ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state)); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + commitBegin, commitEnd); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5290d7d28eb57728531bcc1dd63ba6a808d82563..f934f9a268d68635e0ca51bdeb49a6904856aa22 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,19 +190,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", - ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", - ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index efdb5b24ce6497890508c62881deccb6b408410c..d010728c78210c3c81d1b40ad4dac3adfd707f38 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex, - syncUtilState2String(pSyncNode->state)); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 557342be1cf46ec656c515436404348ce835f740..343a0e55977ea63e8657fcf487eba32982e14912 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -575,8 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -585,8 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -832,7 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); return pSyncNode; } @@ -879,7 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); int32_t ret; assert(pSyncNode != NULL); @@ -1318,7 +1322,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; @@ -1371,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, - oldSenders[j], oldSenders[j]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " + "privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], + oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1381,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex // reset replicaIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; - sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, - (pSyncNode->senders)[i], reset); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " + "reset:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); } } } @@ -1392,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, - (pSyncNode->senders)[i]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1402,7 +1413,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; } @@ -1474,9 +1486,10 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); // maybe clear leader cache @@ -1514,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // reset restoreFinish pSyncNode->restoreFinish = false; - sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, - pSyncNode->restoreFinish, debugStr); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " + "restoreFinish:%d, %s", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2090,13 +2106,15 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, - ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, - pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, + pSyncLeaderTransfer->newLeaderId.addr); // reset elect timer now! int32_t electMS = 1; @@ -2174,8 +2192,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE // change isStandBy to normal if (!isDrop) { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char* oldStr = syncCfg2Str(&oldSyncCfg); + char* newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + if (ths->state == TAOS_SYNC_STATE_LEADER) { syncNodeBecomeLeader(ths, tmpbuf); } else { @@ -2183,8 +2209,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE } } } else { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char* oldStr = syncCfg2Str(&oldSyncCfg); + char* newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + syncNodeBecomeFollower(ths, tmpbuf); } @@ -2218,8 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, - ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 + ", %s", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, + endIndex, syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { @@ -2267,8 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, - ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + syncUtilState2String(ths->state), pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 31ac44fa81b91a7bd13af621f1e291b5ac60ca7a..26da607c8ef5c5bde5f18e8824968cf4e114d434 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, - syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " + "originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -323,10 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, - syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), - pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -407,18 +410,20 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { } int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - // assert(walCommit(pWal, index) == 0); - int32_t code = walCommit(pWal, index); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t linuxErr = errno; - const char* linuxErrMsg = strerror(errno); - sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); - ASSERT(0); - } + + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + // assert(walCommit(pWal, index) == 0); + int32_t code = walCommit(pWal, index); + if (code != 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); + } + return 0; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 5b127793d644a2387d13e53fc855455bb0266a07..551c2611b06cc8a93cb5bad09b23ccbd5687fb44 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -46,9 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, - pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -71,9 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, - index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -90,9 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, - index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d3785060d4acae4f5a7f64b063fdbe9844cc85ed..ef2011cc2620f44e8caf149b9488f14b5bad5bca 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,20 +141,24 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } @@ -283,29 +287,35 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } @@ -339,14 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, - pSender->privateTerm, msgStr); + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm, + msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, - pSender->ack, pSender->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -579,17 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -602,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { - int32_t oldReplicaNum = pSyncNode->replicaNum; + // int32_t oldReplicaNum = pSyncNode->replicaNum; + SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; // update new config myIndex SSyncCfg newSyncCfg = pMsg->lastConfig; @@ -626,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " + "newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } // change isStandBy to normal if (!isDrop) { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char *oldStr = syncCfg2Str(&oldSyncCfg); + char *newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { syncNodeBecomeLeader(pSyncNode, tmpbuf); } else { @@ -662,20 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, - snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, - logSimpleStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, - snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -686,17 +721,21 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -711,20 +750,24 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -744,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 40265965481654736e8d8b7448c369bd1918e573..cbc1298113ca630101205ea8da8eeadebb54c744 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -168,14 +168,26 @@ char* syncUtilRaftId2Str(const SRaftId* p) { } const char* syncUtilState2String(ESyncState state) { + /* + if (state == TAOS_SYNC_STATE_FOLLOWER) { + return "TAOS_SYNC_STATE_FOLLOWER"; + } else if (state == TAOS_SYNC_STATE_CANDIDATE) { + return "TAOS_SYNC_STATE_CANDIDATE"; + } else if (state == TAOS_SYNC_STATE_LEADER) { + return "TAOS_SYNC_STATE_LEADER"; + } else { + return "TAOS_SYNC_STATE_UNKNOWN"; + } + */ + if (state == TAOS_SYNC_STATE_FOLLOWER) { - return "TAOS_SYNC_STATE_FOLLOWER"; + return "follower"; } else if (state == TAOS_SYNC_STATE_CANDIDATE) { - return "TAOS_SYNC_STATE_CANDIDATE"; + return "candidate"; } else if (state == TAOS_SYNC_STATE_LEADER) { - return "TAOS_SYNC_STATE_LEADER"; + return "leader"; } else { - return "TAOS_SYNC_STATE_UNKNOWN"; + return "state_error"; } } diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 92beeffa0cb7246986bb10ab508fdc4d4688a562..2d3d41de64b7939c0a3f543811adf75715b59811 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { } int32_t tfsRmdir(STfs *pTfs, const char *rname) { + ASSERT(rname[0] != 0); + char aname[TMPNAME_LEN] = "\0"; for (int32_t level = 0; level < pTfs->nlevel; level++) { @@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { for (int32_t id = 0; id < pTier->ndisk; id++) { STfsDisk *pDisk = pTier->disks[id]; snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); + uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname); taosRemoveDir(aname); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d9f5cbadb29ee0c6aaa11bf3e28b9d6fa3070be1..e41b704156b9c02cc70a583e7ce93b8689426759 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -277,9 +277,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") +// mnode-stream TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") diff --git a/tests/script/tsim/mnode/basic5.sim b/tests/script/tsim/mnode/basic5.sim index 399dced16909586443a2bcfc151d5b525904c161..fc591aa25d3677ebfaf6e982555ab09512badad8 100644 --- a/tests/script/tsim/mnode/basic5.sim +++ b/tests/script/tsim/mnode/basic5.sim @@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 +system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1 system sh/exec.sh -n dnode1 -s start sql connect @@ -260,7 +264,7 @@ $x = 0 step92: $x = $x + 1 sleep 1000 - if $x == 10 then + if $x == 20 then return -1 endi sql show mnodes diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 5a68f548da04a33f325fa549e044a8f2940e51ba..55bfd8c945082ef53bdb66de8c080c25f6208621 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -33,8 +33,16 @@ import taos def checkRunTimeError(): import win32gui + timeCount = 0 while 1: time.sleep(1) + timeCount = timeCount + 1 + if (timeCount>900): + os.system("TASKKILL /F /IM taosd.exe") + os.system("TASKKILL /F /IM taos.exe") + os.system("TASKKILL /F /IM tmq_sim.exe") + os.system("TASKKILL /F /IM mintty.exe") + quit(0) hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") if hwnd: os.system("TASKKILL /F /IM taosd.exe")