提交 87e4d500 编写于 作者: S Shengliang Guan

[TD-437] stable refcount error while drop db

上级 488addaf
......@@ -27,7 +27,8 @@ void mnodeCleanupVgroups();
SVgObj *mnodeGetVgroup(int32_t vgId);
void mnodeIncVgroupRef(SVgObj *pVgroup);
void mnodeDecVgroupRef(SVgObj *pVgroup);
void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg);
void mnodeDropAllDbVgroups(SDbObj *pDropDb);
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb);
void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode);
void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
......
......@@ -81,10 +81,10 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
mnodeDropDbFromAcct(pAcct, pDb);
mnodeDropAllChildTables(pDb);
mnodeDropAllSuperTables(pDb);
mnodeDropAllDbVgroups(pDb, false);
mnodeDropAllDbVgroups(pDb);
mnodeDropDbFromAcct(pAcct, pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS;
......@@ -998,19 +998,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
return code;
}
#if 1
mnodeDropAllDbVgroups(pMsg->pDb, true);
#else
SVgObj *pVgroup = pMsg->pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgId:%d, will be dropped", pVgroup->vgId);
SMnodeMsg *newMsg = mnodeCloneMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mnodeDropVgroup(pVgroup, newMsg);
return;
}
#endif
mnodeSendDropAllDbVgroupsMsg(pMsg->pDb);
mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name);
return mnodeDropDb(pMsg);
......
......@@ -211,7 +211,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = size;
if (rowsToRead == 0 || (rowsRead == rowsToRead)) {
if (rowsToRead == 0 || rowsRead == rowsToRead) {
pRsp->completed = 1;
mnodeReleaseShowObj(pShow, true);
} else {
......
......@@ -148,37 +148,30 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
mnodeDecVgroupRef(pVgroup);
SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
mnodeDecDbRef(pDb);
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SAcctObj *pAcct = NULL;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
return TSDB_CODE_INVALID_ACCT;
}
mnodeDecAcctRef(pAcct);
pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup != NULL) pDb = mnodeGetDb(pVgroup->dbName);
if (pDb != NULL) pAcct = mnodeGetAcct(pDb->acct);
if (pTable->info.type == TSDB_CHILD_TABLE) {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
mnodeRemoveTableFromStable(pTable->superTable, pTable);
mnodeDecTableRef(pTable->superTable);
} else {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
mnodeRemoveTableFromDb(pDb);
mnodeRemoveTableFromVgroup(pVgroup, pTable);
if (pDb != NULL) mnodeRemoveTableFromDb(pDb);
if (pVgroup != NULL) mnodeRemoveTableFromVgroup(pVgroup, pTable);
mnodeDecVgroupRef(pVgroup);
mnodeDecDbRef(pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS;
}
......@@ -693,10 +686,10 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
}
if (pCreate->numOfTags != 0) {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
......@@ -1288,7 +1281,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
}
pRsp->numOfTables = htonl(numOfTable);
char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg);
char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
......@@ -1318,6 +1311,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
}
taosHashDestroyIter(pIter);
mnodeDecTableRef(pTable);
pVgroupInfo->numOfVgroups = htonl(vgSize);
......@@ -1738,7 +1732,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STagData* pTag = (STagData*)pInfo->tags;
STagData *pTag = (STagData *)pInfo->tags;
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen);
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
......@@ -1754,14 +1748,13 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
pCreateMsg->contLen = htonl(contLen);
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags);
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
pMsg->rpcMsg.pCont = pCreateMsg;
pMsg->rpcMsg.contLen = contLen;
return TSDB_CODE_ACTION_NEED_REPROCESSED;
}
......
......@@ -784,7 +784,7 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) {
mPrint("db:%s, all vgroups is updated in sdb", pAlterDb->name);
}
void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
void * pIter = NULL;
int32_t numOfVgroups = 0;
SVgObj *pVgroup = NULL;
......@@ -802,10 +802,28 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
};
sdbDeleteRow(&oper);
numOfVgroups++;
}
if (sendMsg) {
mnodeSendDropVgroupMsg(pVgroup, NULL);
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
}
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) {
void * pIter = NULL;
int32_t numOfVgroups = 0;
SVgObj *pVgroup = NULL;
mPrint("db:%s, all vgroups will be dropped in dnode", pDropDb->name);
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDropDb) {
mnodeSendDropVgroupMsg(pVgroup, NULL);
}
mnodeDecVgroupRef(pVgroup);
......@@ -813,5 +831,5 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
sdbFreeIter(pIter);
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
mPrint("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册