提交 e141eb96 编写于 作者: L lichuang

[TD-3963]fix alter vgroup bugs

上级 54a1cf26
...@@ -40,7 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb); ...@@ -40,7 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb);
void mnodeDecDbRef(SDbObj *pDb); void mnodeDecDbRef(SDbObj *pDb);
bool mnodeCheckIsMonitorDB(char *db, char *monitordb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb);
void mnodeDropAllDbs(SAcctObj *pAcct); void mnodeDropAllDbs(SAcctObj *pAcct);
int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg); int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg);
int32_t mnodeCompactDbs(); int32_t mnodeCompactDbs();
......
...@@ -55,7 +55,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -55,7 +55,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
...@@ -203,7 +202,6 @@ int32_t mnodeInitDbs() { ...@@ -203,7 +202,6 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_CM_ALTER_DB_RSP, mnodeProcessAlterDbRsp);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
...@@ -1102,7 +1100,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { ...@@ -1102,7 +1100,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg) {
SSdbRow desc = { SSdbRow desc = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb, .pTable = tsDbSdb,
...@@ -1110,7 +1108,7 @@ int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { ...@@ -1110,7 +1108,7 @@ int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) {
.pMsg = pMsg, .pMsg = pMsg,
}; };
return sdbInsertRowToQueue(&desc); return sdbUpdateRow(&desc);
} }
static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
...@@ -1299,35 +1297,6 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { ...@@ -1299,35 +1297,6 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs);
} }
static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return;
SMnodeMsg *pMsg = rpcMsg->ahandle;
pMsg->received++;
SDbObj *pDb = (SDbObj *)pMsg->pDb;
assert(pDb);
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
mDebug("msg:%p, app:%p db:%s, altered in dnode, thandle:%p result:%s", pMsg, pMsg->rpcMsg.ahandle,
pDb->name, pMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
} else {
if (pMsg->retry++ < ALTER_CDB_RETRY_TIMES) {
mDebug("msg:%p, app:%p db:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p",
pMsg->rpcMsg.ahandle, pMsg, pDb->name, pMsg->retry, tstrerror(rpcMsg->code),
pMsg->rpcMsg.handle);
dnodeDelayReprocessMWriteMsg(pMsg);
} else {
mError("msg:%p, app:%p db:%s, failed to alter in dnode, result:%s thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pDb->name, tstrerror(rpcMsg->code), pMsg->rpcMsg.handle);
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
}
}
}
int32_t mnodeCompactDbs() { int32_t mnodeCompactDbs() {
void *pIter = NULL; void *pIter = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
......
...@@ -1044,10 +1044,15 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -1044,10 +1044,15 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); mnodeMsg->rpcMsg.handle, rpcMsg->ahandle);
if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received != mnodeMsg->expected) return;
uint8_t msgType = mnodeMsg->rpcMsg.msgType;
mnodeInsertAlterRow(pVgroup->pDb, mnodeMsg); if (msgType == TSDB_MSG_TYPE_CM_ALTER_DB || msgType == TSDB_MSG_TYPE_CM_CREATE_TP || msgType == TSDB_MSG_TYPE_CM_ALTER_TP) {
int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg);
dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
dnodeSendRpcMWriteRsp(mnodeMsg, code);
}
} else {
dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
}
} }
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册