提交 68c7fc49 编写于 作者: L lichuang

[TD-3963]alter db rpc return when vnode response

上级 cd549303
...@@ -154,7 +154,6 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -154,7 +154,6 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pCreate->cfg.vgId); void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
......
...@@ -49,7 +49,7 @@ int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCh ...@@ -49,7 +49,7 @@ int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCh
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
......
...@@ -24,12 +24,14 @@ ...@@ -24,12 +24,14 @@
#include "tdataformat.h" #include "tdataformat.h"
#include "tp.h" #include "tp.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeAcct.h" #include "mnodeAcct.h"
#include "mnodeDb.h" #include "mnodeDb.h"
#include "mnodeDnode.h" #include "mnodeDnode.h"
#include "mnodeMnode.h" #include "mnodeMnode.h"
#include "mnodePeer.h"
#include "mnodeProfile.h" #include "mnodeProfile.h"
#include "mnodeWrite.h" #include "mnodeWrite.h"
#include "mnodeSdb.h" #include "mnodeSdb.h"
...@@ -43,6 +45,8 @@ int64_t tsDbRid = -1; ...@@ -43,6 +45,8 @@ int64_t tsDbRid = -1;
void * tsDbSdb = NULL; void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
#define ALTER_CDB_RETRY_TIMES 3
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg); static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg);
static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeSetDbDropping(SDbObj *pDb);
...@@ -51,6 +55,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -51,6 +55,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
...@@ -198,6 +203,7 @@ int32_t mnodeInitDbs() { ...@@ -198,6 +203,7 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_CM_ALTER_DB_RSP, mnodeProcessAlterDbRsp);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
...@@ -1070,27 +1076,30 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -1070,27 +1076,30 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
return newCfg; return newCfg;
} }
static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) return code;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
void *pIter = NULL; void *pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
while (1) { while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup); pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) { if (pVgroup->pDb == pDb) {
mnodeSendAlterVgroupMsg(pVgroup); mnodeSendAlterVgroupMsg(pVgroup,pMsg);
pMsg->expected += 1;
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
} }
// in case there is no vnode(no db in vnode)
if (pMsg->expected == 0) {
return TSDB_CODE_SUCCESS;
}
mDebug("db:%s, all vgroups is altered", pDb->name); mDebug("db:%s, all vgroups is altered", pDb->name);
mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
bnNotify(); //bnNotify();
return TSDB_CODE_SUCCESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
...@@ -1114,7 +1123,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { ...@@ -1114,7 +1123,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
.pTable = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAlterDbCb .fpReq = mnodeAlterDbFp
}; };
code = sdbUpdateRow(&row); code = sdbUpdateRow(&row);
...@@ -1279,6 +1288,35 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { ...@@ -1279,6 +1288,35 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs);
} }
static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return;
SMnodeMsg *pMsg = rpcMsg->ahandle;
pMsg->received++;
SDbObj *pDb = (SDbObj *)pMsg->pDb;
assert(pDb);
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
mDebug("msg:%p, app:%p db:%s, altered in dnode, thandle:%p result:%s", pMsg, pMsg->rpcMsg.ahandle,
pDb->name, pMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
} else {
if (pMsg->retry++ < ALTER_CDB_RETRY_TIMES) {
mDebug("msg:%p, app:%p db:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p",
pMsg->rpcMsg.ahandle, pMsg, pDb->name, pMsg->retry, tstrerror(rpcMsg->code),
pMsg->rpcMsg.handle);
dnodeDelayReprocessMWriteMsg(pMsg);
} else {
mError("msg:%p, app:%p db:%s, failed to alter in dnode, result:%s thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pDb->name, tstrerror(rpcMsg->code), pMsg->rpcMsg.handle);
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
}
}
}
int32_t mnodeCompactDbs() { int32_t mnodeCompactDbs() {
void *pIter = NULL; void *pIter = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
......
...@@ -60,7 +60,6 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -60,7 +60,6 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
...@@ -237,7 +236,6 @@ int32_t mnodeInitVgroups() { ...@@ -237,7 +236,6 @@ int32_t mnodeInitVgroups() {
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
...@@ -271,7 +269,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { ...@@ -271,7 +269,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("vgId:%d, failed to update vgroup", pVgroup->vgId); mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
} }
mnodeSendAlterVgroupMsg(pVgroup); mnodeSendAlterVgroupMsg(pVgroup,NULL);
} }
/* /*
...@@ -350,7 +348,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -350,7 +348,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d", mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d",
pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica, pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica,
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes); pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup); mnodeSendAlterVgroupMsg(pVgroup,NULL);
} }
} }
...@@ -946,10 +944,10 @@ SRpcEpSet mnodeGetEpSetFromIp(char *ep) { ...@@ -946,10 +944,10 @@ SRpcEpSet mnodeGetEpSetFromIp(char *ep) {
return epSet; return epSet;
} }
static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, SMnodeMsg *pMsg) {
SAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = NULL, .ahandle = pMsg,
.pCont = pAlter, .pCont = pAlter,
.contLen = pAlter ? sizeof(SAlterVnodeMsg) : 0, .contLen = pAlter ? sizeof(SAlterVnodeMsg) : 0,
.code = 0, .code = 0,
...@@ -958,14 +956,17 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { ...@@ -958,14 +956,17 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
dnodeSendMsgToDnode(epSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg) {
mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName); pVgroup->dbName);
if (pMsg) {
pMsg->pVgroup = pVgroup;
}
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp); pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendAlterVnodeMsg(pVgroup, &epSet); mnodeSendAlterVnodeMsg(pVgroup, &epSet,pMsg);
} }
} }
...@@ -1026,11 +1027,24 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { ...@@ -1026,11 +1027,24 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
} }
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("alter vnode rsp received"); mDebug("alter vnode rsp is received, handle:%p", rpcMsg->ahandle);
} if (rpcMsg->ahandle == NULL) return;
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
mnodeMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
mnodeMsg->code = rpcMsg->code;
mnodeMsg->successed++;
}
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) { SVgObj *pVgroup = mnodeMsg->pVgroup;
mDebug("sync vnode rsp received"); mDebug("vgId:%d, alter vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
mnodeMsg->rpcMsg.handle, rpcMsg->ahandle);
if (mnodeMsg->received != mnodeMsg->expected) return;
dnodeReprocessMWriteMsg(mnodeMsg);
} }
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
......
...@@ -161,9 +161,9 @@ class TDTestCase: ...@@ -161,9 +161,9 @@ class TDTestCase:
# the following line should generate an error, but the insert was a success # the following line should generate an error, but the insert was a success
# the time now-15d is out of range of now -10d # the time now-15d is out of range of now -10d
tdSql.execute('insert into tb values (now-15d, 10)') tdSql.error('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb') tdSql.query('select * from tb')
tdSql.checkRows(rowNum + 1) tdSql.checkRows(rowNum)
# tdSql.execute('alter database db keep 20,20,20') # tdSql.execute('alter database db keep 20,20,20')
# tdSql.query('show databases') # tdSql.query('show databases')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册