diff --git a/src/mnode/inc/mnodeDb.h b/src/mnode/inc/mnodeDb.h index 8f014ae9c87a56b444a3d815a87919dedc824368..40c5afc2c84cd497095a508df42b9e89ad638d70 100644 --- a/src/mnode/inc/mnodeDb.h +++ b/src/mnode/inc/mnodeDb.h @@ -40,7 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb); void mnodeDecDbRef(SDbObj *pDb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb); void mnodeDropAllDbs(SAcctObj *pAcct); -int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg); +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg); int32_t mnodeCompactDbs(); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 0c03cbb5a06b68839d151a2e27b80f564c501ada..95a5532b42222fa4ad7725704cc62797e5c57c4e 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -55,7 +55,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); -static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); #ifndef _TOPIC @@ -203,7 +202,6 @@ int32_t mnodeInitDbs() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); - mnodeAddPeerRspHandle(TSDB_MSG_TYPE_CM_ALTER_DB_RSP, mnodeProcessAlterDbRsp); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); @@ -1102,7 +1100,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg) { SSdbRow desc = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, @@ -1110,7 +1108,7 @@ int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { .pMsg = pMsg, }; - return sdbInsertRowToQueue(&desc); + return sdbUpdateRow(&desc); } static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { @@ -1299,35 +1297,6 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); } -static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->ahandle == NULL) return; - - SMnodeMsg *pMsg = rpcMsg->ahandle; - pMsg->received++; - - SDbObj *pDb = (SDbObj *)pMsg->pDb; - assert(pDb); - - if (rpcMsg->code == TSDB_CODE_SUCCESS) { - mDebug("msg:%p, app:%p db:%s, altered in dnode, thandle:%p result:%s", pMsg, pMsg->rpcMsg.ahandle, - pDb->name, pMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - - dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS); - } else { - if (pMsg->retry++ < ALTER_CDB_RETRY_TIMES) { - mDebug("msg:%p, app:%p db:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p", - pMsg->rpcMsg.ahandle, pMsg, pDb->name, pMsg->retry, tstrerror(rpcMsg->code), - pMsg->rpcMsg.handle); - - dnodeDelayReprocessMWriteMsg(pMsg); - } else { - mError("msg:%p, app:%p db:%s, failed to alter in dnode, result:%s thandle:%p", pMsg, pMsg->rpcMsg.ahandle, - pDb->name, tstrerror(rpcMsg->code), pMsg->rpcMsg.handle); - dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code); - } - } -} - int32_t mnodeCompactDbs() { void *pIter = NULL; SDbObj *pDb = NULL; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 9eb736f0bb7279890fc1b056019e4dee926ef180..93b897cb1ea4b22c3d2a6c830d8e003902edb621 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -1044,10 +1044,15 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); if (mnodeMsg->received != mnodeMsg->expected) return; - - mnodeInsertAlterRow(pVgroup->pDb, mnodeMsg); - - dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + uint8_t msgType = mnodeMsg->rpcMsg.msgType; + if (msgType == TSDB_MSG_TYPE_CM_ALTER_DB || msgType == TSDB_MSG_TYPE_CM_CREATE_TP || msgType == TSDB_MSG_TYPE_CM_ALTER_TP) { + int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + dnodeSendRpcMWriteRsp(mnodeMsg, code); + } + } else { + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + } } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {