diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb1addf1b698479ca418882a1a16c3c59e5347cc..ad6077db098b18d2b10d95058831d4f8c25d046a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -68,7 +68,7 @@ typedef uint16_t tmsg_t; static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); } static inline bool syncUtilUserCommit(tmsg_t msgType) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 4a6f0d14daa177205ea7f6c528fc7a4bfbaed324..31ab1f3259cd45b0223c5962fbca5f27386da57a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1126,8 +1126,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, } if (!force) { +#if 1 + { +#else if (newVg.replica == 1) { - mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId); +#endif + mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; for (int32_t i = 0; i < newVg.replica - 1; ++i) { if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; @@ -1155,6 +1159,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; } if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; +#if 1 + } +#else } else { // new replica == 3 mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; @@ -1181,6 +1188,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; } +#endif } else { mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d6ce77193a7c38a670cd704634f91f14c7f60d6a..6a545424fcee50e8a77affb4e92a13052afd27ae 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -233,7 +233,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { rpcSendResponse(&rpcMsg); return 0; } else { - sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq); + sError("no message handle to send timeout response, seq:%" PRId64, seq); return -1; } } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 049b02d73e6a89344aa0266fc086cc69db943cb5..79a38cad7a55fdcc0a587c48299d155d3f396931 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -35,11 +35,16 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { pObj->seqNum = 0; taosThreadMutexInit(&(pObj->mutex), NULL); + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, create resp manager", pNode->vgId); return pObj; } void syncRespMgrDestroy(SSyncRespMgr *pObj) { if (pObj != NULL) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, destroy resp manager", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); taosHashCleanup(pObj->pRespHash); taosThreadMutexUnlock(&pObj->mutex); @@ -81,6 +86,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) { taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object + } else { + sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq); } taosThreadMutexUnlock(&pObj->mutex); @@ -99,6 +106,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object + } else { + sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq); } taosThreadMutexUnlock(&pObj->mutex); @@ -114,7 +123,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t)); if (delIndexArray == NULL) return; - sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId); + sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId); while (pStub) { size_t len; void *key = taosHashGetKey(pStub, &len); @@ -143,34 +152,39 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { // TODO: and make rpcMsg body, call commit cb // pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta); - - pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER; - if (pStub->rpcMsg.info.handle != NULL) { - tmsgSendRsp(&pStub->rpcMsg); - } + SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT}; + sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle, + TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle); + rpcSendResponse(&rpcMsg); } pStub = taosHashIterate(pObj->pRespHash, pStub); } int32_t arraySize = taosArrayGetSize(delIndexArray); - sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize); + sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize); for (int32_t i = 0; i < arraySize; ++i) { uint64_t *pSeqNum = taosArrayGet(delIndexArray, i); taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); - sDebug("vgId:%d, resp mgr clean by ttl, seq:%" PRId64 "", pSyncNode->vgId, *pSeqNum); + sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum); } taosArrayDestroy(delIndexArray); } void syncRespCleanRsp(SSyncRespMgr *pObj) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, clean all rsp", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, -1, true); taosThreadMutexUnlock(&pObj->mutex); } void syncRespClean(SSyncRespMgr *pObj) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, clean rsp by ttl", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, pObj->ttl, false); taosThreadMutexUnlock(&pObj->mutex);