diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 012bc40c8a36fedee1c4d3ffffa1a9e663563fdd..73f8515f22df69b8a2a4bf2f2cfff2170331a903 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -193,13 +193,11 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 78ab8509640d9d24304d8c55df9408756bf28f69..27f5e6f7af0fbbdee169b6ab252798ffa3f4449a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -213,6 +213,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0819f79cf92139e6e9eaa734237df15de4340d1e..77109ab52f1f9e6067ff48ac6b70a40a221b37ab 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -361,6 +361,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 3f4f3f2053bd4fd633488eaf4a4fac71d642df51..2119320de5cf97e620f36a1d8c806e5d6efa2d4d 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -34,7 +34,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SArray *mndBuildDnodesArray(SMnode *pMnode); int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray); -int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2); +int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pVgId); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby); void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 801f335a8056757c2cbe2d7f1ca6d65a4501003f..38a166900da9e1f18c3b3afeb665e736f3a4a2c4 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -30,25 +30,25 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq); -static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq); -static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp); static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter); int32_t mndInitBnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_BNODE, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndBnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndBnodeActionDecode, - .insertFp = (SdbInsertFp)mndBnodeActionInsert, - .updateFp = (SdbUpdateFp)mndBnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndBnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_BNODE, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndBnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndBnodeActionDecode, + .insertFp = (SdbInsertFp)mndBnodeActionInsert, + .updateFp = (SdbUpdateFp)mndBnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndBnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq); - mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp); - mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp); + mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode); @@ -427,16 +427,6 @@ _OVER: return code; } -static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bbd80d765a876c671e2131fe3a17e22233acac02..1ce1a2590a2a8948b996fab059e5fb7b76b08314 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -263,7 +263,8 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { sdbRelease(pSdb, pDb); } -static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby) { +static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, + bool standby) { STransAction action = {0}; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); @@ -288,6 +289,32 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } +static int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = taosMemoryMalloc(contLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + action.pCont = pHead; + action.contLen = contLen; + action.msgType = TDMT_VND_ALTER_CONFIRM; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pHead); + return -1; + } + + return 0; +} + static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) { STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); @@ -415,7 +442,6 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; - } static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { @@ -726,30 +752,32 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj mndTransSetSerial(pTrans); if (newVgroup.replica < pDb->cfg.replications) { - mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, + mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId); - if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) { - mError("db:%s, failed to add vnode to vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr()); - return -1; - } - newVgroup.replica = pDb->cfg.replications; + if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1; + if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; + + if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; } else { mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId); SVnodeGid del1 = {0}; - SVnodeGid del2 = {0}; - if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1, &del2) != 0) { - mError("db:%s, failed to remove vnode from vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr()); - return -1; - } - newVgroup.replica = pDb->cfg.replications; + if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; + + SVnodeGid del2 = {0}; + if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1; + if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; } SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); @@ -1341,7 +1369,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { - mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); + mDebug("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName); mndReleaseDb(pMnode, pDb); continue; } else { @@ -1433,12 +1461,22 @@ const char *mndGetDbStr(const char *src) { int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { int64_t v = 0; - switch(unit) { - case 's': v = val / 1000; break; - case 'm': v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; break; - case 'h': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); break; - case 'd': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); break; - case 'w': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); break; + switch (unit) { + case 's': + v = val / 1000; + break; + case 'm': + v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; + break; + case 'h': + v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); + break; + case 'd': + v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); + break; + case 'w': + v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); + break; default: break; } @@ -1446,32 +1484,32 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { return v; } -char* buildRetension(SArray* pRetension) { +char *buildRetension(SArray *pRetension) { size_t size = taosArrayGetSize(pRetension); if (size == 0) { return NULL; } - char* p1 = taosMemoryCalloc(1, 100); - SRetention* p = taosArrayGet(pRetension, 0); + char *p1 = taosMemoryCalloc(1, 100); + SRetention *p = taosArrayGet(pRetension, 0); int32_t len = 2; int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); p = taosArrayGet(pRetension, 1); v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); p = taosArrayGet(pRetension, 2); v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c", v1, p->freqUnit, v2, p->keepUnit); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); varDataSetLen(p1, len); return p1; @@ -1586,7 +1624,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in } colDataAppend(pColInfo, rows, cacheModel, null); #endif - colDataAppend(pColInfo, rows, (const char*) &pDb->cfg.cacheLastRow, false); + colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false); char *prec = NULL; switch (pDb->cfg.precision) { @@ -1618,7 +1656,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); - char* p = buildRetension(pDb->cfg.pRetensions); + char *p = buildRetension(pDb->cfg.pRetensions); pColInfo = taosArrayGet(pBlock->pDataBlock, cols); if (p == NULL) { diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 27881865af11913b4a04c4fc84df115e98823fd1..4fb9226144a16f1d44bdddb8826bc91888647f0c 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -30,26 +30,26 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew); static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq); -static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq); -static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessQnodeListReq(SRpcMsg *pReq); static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); int32_t mndInitQnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_QNODE, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndQnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndQnodeActionDecode, - .insertFp = (SdbInsertFp)mndQnodeActionInsert, - .updateFp = (SdbUpdateFp)mndQnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndQnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_QNODE, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndQnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndQnodeActionDecode, + .insertFp = (SdbInsertFp)mndQnodeActionInsert, + .updateFp = (SdbUpdateFp)mndQnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndQnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq); - mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp); - mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp); + mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes); @@ -503,16 +503,6 @@ _OVER: return code; } -static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 6cb70d1f27895cd64f08fdb383f4072739e03f52..e8e23ccc2a6593619b6d5586406840d6494540b0 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -38,25 +38,25 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); -static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp); -static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); int32_t mndInitSma(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_SMA, - .keyType = SDB_KEY_BINARY, - .encodeFp = (SdbEncodeFp)mndSmaActionEncode, - .decodeFp = (SdbDecodeFp)mndSmaActionDecode, - .insertFp = (SdbInsertFp)mndSmaActionInsert, - .updateFp = (SdbUpdateFp)mndSmaActionUpdate, - .deleteFp = (SdbDeleteFp)mndSmaActionDelete}; + SSdbTable table = { + .sdbType = SDB_SMA, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndSmaActionEncode, + .decodeFp = (SdbDecodeFp)mndSmaActionDecode, + .insertFp = (SdbInsertFp)mndSmaActionInsert, + .updateFp = (SdbUpdateFp)mndSmaActionUpdate, + .deleteFp = (SdbDeleteFp)mndSmaActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp); + mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); @@ -637,11 +637,6 @@ _OVER: return code; } -static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma); if (pRedoRaw == NULL) return -1; @@ -910,11 +905,6 @@ _OVER: return code; } -static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index c6acb4fef4a09ef78c561178f11428cb3004b4f3..f5a1e76723f11623644b30a093a0fea0cd7fd1b7 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -30,25 +30,25 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq); -static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq); -static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp); static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter); int32_t mndInitSnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_SNODE, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndSnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndSnodeActionDecode, - .insertFp = (SdbInsertFp)mndSnodeActionInsert, - .updateFp = (SdbUpdateFp)mndSnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndSnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_SNODE, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndSnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndSnodeActionDecode, + .insertFp = (SdbInsertFp)mndSnodeActionInsert, + .updateFp = (SdbUpdateFp)mndSnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndSnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq); - mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp); - mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp); + mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode); @@ -437,16 +437,6 @@ _OVER: return code; } -static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 556837f397e0d9c09546d5fa9400654b44f39401..175878959dd99a038e27c215893522afad4574ae 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -38,9 +38,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessMDropStbReq(SRpcMsg *pReq); -static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp); -static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp); -static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -59,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); @@ -837,11 +834,6 @@ _OVER: return code; } -static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0; @@ -1459,11 +1451,6 @@ _OVER: return code; } -static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); if (pRedoRaw == NULL) return -1; @@ -1599,11 +1586,6 @@ _OVER: return code; } -static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5ee5b06a578f7c31ab18f66f2de1cdef2aa85a04..a0aa2067d3fa76f2ef7dec332c3dda35c0d29349 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -35,7 +35,6 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); -static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp); /*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/ /*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); @@ -44,17 +43,19 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_STREAM, - .keyType = SDB_KEY_BINARY, - .encodeFp = (SdbEncodeFp)mndStreamActionEncode, - .decodeFp = (SdbDecodeFp)mndStreamActionDecode, - .insertFp = (SdbInsertFp)mndStreamActionInsert, - .updateFp = (SdbUpdateFp)mndStreamActionUpdate, - .deleteFp = (SdbDeleteFp)mndStreamActionDelete}; + SSdbTable table = { + .sdbType = SDB_STREAM, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndStreamActionEncode, + .decodeFp = (SdbDecodeFp)mndStreamActionDecode, + .insertFp = (SdbInsertFp)mndStreamActionInsert, + .updateFp = (SdbUpdateFp)mndStreamActionUpdate, + .deleteFp = (SdbDeleteFp)mndStreamActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); - mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); - mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp); /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ @@ -195,11 +196,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } -static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { SName name = {0}; tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c7f8415b65db611500d3df1907405a1d07b4b3c2..cbaf8bc1d15b13f59440b06797048e10ddd55b07 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -43,7 +43,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); @@ -65,20 +64,22 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO } int32_t mndInitSubscribe(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_SUBSCRIBE, - .keyType = SDB_KEY_BINARY, - .encodeFp = (SdbEncodeFp)mndSubActionEncode, - .decodeFp = (SdbDecodeFp)mndSubActionDecode, - .insertFp = (SdbInsertFp)mndSubActionInsert, - .updateFp = (SdbUpdateFp)mndSubActionUpdate, - .deleteFp = (SdbDeleteFp)mndSubActionDelete}; - - mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); - mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp); + SSdbTable table = { + .sdbType = SDB_SUBSCRIBE, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndSubActionEncode, + .decodeFp = (SdbDecodeFp)mndSubActionDecode, + .insertFp = (SdbInsertFp)mndSubActionInsert, + .updateFp = (SdbUpdateFp)mndSubActionUpdate, + .deleteFp = (SdbDeleteFp)mndSubActionDelete, + }; + + mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); @@ -789,11 +790,6 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { sdbRelease(pSdb, pSub); } -static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e0d565c9afb0fafe46c5ff3dd96a83ee9c76372d..95f22a33db962b5cdea9a3b5d2e317161aa48c69 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -37,7 +37,6 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq); static int32_t mndProcessDropTopicReq(SRpcMsg *pReq); -static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp); static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -45,17 +44,19 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); int32_t mndInitTopic(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_TOPIC, - .keyType = SDB_KEY_BINARY, - .encodeFp = (SdbEncodeFp)mndTopicActionEncode, - .decodeFp = (SdbDecodeFp)mndTopicActionDecode, - .insertFp = (SdbInsertFp)mndTopicActionInsert, - .updateFp = (SdbUpdateFp)mndTopicActionUpdate, - .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; + SSdbTable table = { + .sdbType = SDB_TOPIC, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndTopicActionEncode, + .decodeFp = (SdbDecodeFp)mndTopicActionDecode, + .insertFp = (SdbInsertFp)mndTopicActionInsert, + .updateFp = (SdbUpdateFp)mndTopicActionUpdate, + .deleteFp = (SdbDeleteFp)mndTopicActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); @@ -607,11 +608,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 2577febf6611ffbdbeb8d4021df3292e99c7873b..4f09eda733f44cc65e6d4cc7b3e65211a7d55cec 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -29,11 +29,6 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); -static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp); -static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp); -static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp); -static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp); - static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -50,11 +45,12 @@ int32_t mndInitVgroup(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndProcessAlterVnodeRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndProcessAlterVnodeRsp); - mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp); - mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndProcessCompactVnodeRsp); + mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -512,12 +508,12 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); } - int32_t maxPos = 1; + SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica]; for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) { SDnodeObj *pDnode = taosArrayGet(pArray, d); bool used = false; - for (int32_t vn = 0; vn < maxPos; ++vn) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) { used = true; break; @@ -530,59 +526,58 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { return -1; } - SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos]; pVgid->dnodeId = pDnode->id; pVgid->role = TAOS_SYNC_STATE_ERROR; - pDnode->numOfVnodes++; + mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica, + pVgid->dnodeId); - mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); - maxPos++; - if (maxPos == 3) return 0; + pVgroup->replica++; + pDnode->numOfVnodes++; + return 0; } terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); return -1; } -int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) { +int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) { taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { SDnodeObj *pDnode = taosArrayGet(pArray, i); mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); } - int32_t removedNum = 0; + int32_t code = -1; for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) { SDnodeObj *pDnode = taosArrayGet(pArray, d); - for (int32_t vn = 0; vn < TSDB_MAX_REPLICA; ++vn) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; if (pVgid->dnodeId == pDnode->id) { - if (removedNum == 0) *del1 = *pVgid; - if (removedNum == 1) *del2 = *pVgid; - - mInfo("db:%s, vgId:%d, vn:%d dnode:%d is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); - memset(pVgid, 0, sizeof(SVnodeGid)); - removedNum++; + mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); pDnode->numOfVnodes--; - - if (removedNum == 2) goto _OVER; + pVgroup->replica--; + *pDelVgid = *pVgid; + *pVgid = pVgroup->vnodeGid[pVgroup->replica]; + memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); + code = 0; + goto _OVER; } } } _OVER: - if (removedNum != 2) return -1; + if (code != 0) { + terrno = TSDB_CODE_APP_ERROR; + mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); + return -1; + } - for (int32_t vn = 1; vn < TSDB_MAX_REPLICA; ++vn) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; - if (pVgid->dnodeId != 0) { - memcpy(&pVgroup->vnodeGid[0], pVgid, sizeof(SVnodeGid)); - memset(pVgid, 0, sizeof(SVnodeGid)); - } + mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); } - - mInfo("db:%s, vgId:%d, dnode:%d is keeped", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId); return 0; } @@ -605,23 +600,6 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) { return epset; } -static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp) { return 0; } - static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; int64_t uid = *(int64_t *)p1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 537f2e09643cb70e16b66f87ceb8e28d9500eb3c..4bcd5fc095a5e71f5e87e7ee374993f557bf3000 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -15,14 +15,15 @@ #include "vnd.h" -static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -99,11 +100,11 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { return code; } -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { - void *ptr = NULL; - void *pReq; - int len; - int ret; +int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { + void *ptr = NULL; + void *pReq; + int32_t len; + int32_t ret; vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); @@ -158,6 +159,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; + case TDMT_VND_ALTER_CONFIRM: + vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); + break; case TDMT_VND_ALTER_CONFIG: break; default: @@ -190,7 +194,7 @@ _err: return -1; } -int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) { +int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { if (TDMT_VND_QUERY != pMsg->msgType) { return 0; } @@ -212,7 +216,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } } -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { +int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -267,7 +271,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; if (syncEnvIsStart()) { @@ -357,7 +361,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return ret; } -static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder coder; @@ -389,9 +393,9 @@ _err: return -1; } -static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SDecoder decoder = {0}; - int rcode = 0; + int32_t rcode = 0; SVCreateTbBatchReq req = {0}; SVCreateTbReq *pCreateReq; SVCreateTbBatchRsp rsp = {0}; @@ -422,7 +426,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, } // loop to create table - for (int iReq = 0; iReq < req.nReqs; iReq++) { + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; // validate hash @@ -477,7 +481,7 @@ _exit: return rcode; } -static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder dc = {0}; @@ -506,9 +510,9 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, return 0; } -static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVDropStbReq req = {0}; - int rcode = TSDB_CODE_SUCCESS; + int32_t rcode = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; pRsp->msgType = TDMT_VND_CREATE_STB_RSP; @@ -535,12 +539,12 @@ _exit: return 0; } -static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVAlterTbReq vAlterTbReq = {0}; SVAlterTbRsp vAlterTbRsp = {0}; SDecoder dc = {0}; - int rcode = 0; - int ret; + int32_t rcode = 0; + int32_t ret; SEncoder ec = {0}; STableMetaRsp vMetaRsp = {0}; @@ -582,12 +586,12 @@ _exit: return 0; } -static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVDropTbBatchReq req = {0}; SVDropTbBatchRsp rsp = {0}; SDecoder decoder = {0}; SEncoder encoder = {0}; - int ret; + int32_t ret; SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; @@ -609,7 +613,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); if (tbUids == NULL || rsp.pArray == NULL) goto _exit; - for (int iReq = 0; iReq < req.nReqs; iReq++) { + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbRsp dropTbRsp = {0}; @@ -641,7 +645,8 @@ _exit: return 0; } -static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { +static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, + const char *tags) { SSubmitBlkIter blkIter = {0}; STSchema *pSchema = NULL; tb_uid_t suid = 0; @@ -675,7 +680,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub return TSDB_CODE_SUCCESS; } -static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { +static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; SMeta *pMeta = pVnode->pMeta; @@ -692,7 +697,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char return 0; } -static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; @@ -808,7 +813,7 @@ _exit: return 0; } -static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; SDecoder coder; @@ -859,3 +864,13 @@ _err: int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) { return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL); } + +static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode)); + pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + + return 0; +} \ No newline at end of file