diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index c7e3085e69285570e7b6700507c4b41a62c926e4..48acc6787c0d601a1efceb182ea0da48da407a13 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -458,6 +458,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) { pDb->vgList[v2] = pDb->vgList[v2 + 1]; } pDb->numOfVgroups--; + pDb->vgList[pDb->numOfVgroups] = NULL; break; } } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index a4c2c60aa34f513550e64c2a6c83d4589dbc512a..301dd41cbc5e08917b5adf27643822da574a5abe 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -487,7 +487,13 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 ", msg:%p", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, pOper->pMsg); - (*pTable->insertFp)(pOper); + int32_t code = (*pTable->insertFp)(pOper); + if (code != TSDB_CODE_SUCCESS) { + sdbError("table:%s, failed to insert record:%s to hash, remove it", pTable->tableName, + sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbDeleteHash(pTable, pOper); + } + return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 1d14ec030714f5f2c6ab9703fbdc67cc654155e8..5837aed25908751e7925276ed87ba6af9291af29 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2351,14 +2351,16 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { // if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId // so the refCount of vgroup can not be decreased - SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mnodeRemoveTableFromVgroup(pVgroup, pTable); - } - mnodeDecVgroupRef(pVgroup); + // SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); + // if (pVgroup == NULL) { + // mnodeRemoveTableFromVgroup(mnodeMsg->pVgroup, pTable); + // } + // mnodeDecVgroupRef(pVgroup); mnodeSendDropChildTableMsg(mnodeMsg, false); rpcMsg->code = TSDB_CODE_SUCCESS; + dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); + return; } if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index e46da1d8920c12a2a6b8ead97fa625df43180efe..942372024124791c1cd9a8242ce381cc03dedd09 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -83,11 +83,12 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) { // refer to db SDbObj *pDb = mnodeGetDb(pVgroup->dbName); if (pDb == NULL) { + mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_MND_INVALID_DB; } if (pDb->status != TSDB_DB_STATUS_READY) { - mError("db:%s, status:%d, in dropping", pDb->name, pDb->status); + mError("vgId:%d, db:%s status:%d, in dropping", pVgroup->vgId, pDb->name, pDb->status); return TSDB_CODE_MND_DB_IN_DROPPING; } @@ -116,10 +117,12 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) { static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; - if (pVgroup->pDb != NULL) { - mnodeRemoveVgroupFromDb(pVgroup); + if (pVgroup->pDb == NULL) { + mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); + return TSDB_CODE_MND_VGROUP_NOT_EXIST; } + mnodeRemoveVgroupFromDb(pVgroup); mnodeDecDbRef(pVgroup->pDb); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { @@ -446,6 +449,12 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi } } + if (pDb->numOfVgroups < 1) { + mDebug("app:%p:%p, db:%s, failed create new vgroup since:%s, numOfVgroups:%d maxVgroupsPerDb:%d ", + pMsg->rpcMsg.ahandle, pMsg, pDb->name, tstrerror(code), pDb->numOfVgroups, maxVgroupsPerDb); + return code; + } + SVgObj *pVgroup = pDb->vgList[0]; if (pVgroup == NULL) { pthread_mutex_unlock(&pDb->mutex); @@ -517,6 +526,19 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { dnodeReprocessMnodeWriteMsg(pMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; + // if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) { + // mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, + // pDb->name, pVgroup->numOfVnodes); + // pVgroup->status = TAOS_VG_STATUS_READY; + // SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + // (void)sdbUpdateRow(&desc); + // dnodeReprocessMnodeWriteMsg(pMsg); + // return TSDB_CODE_MND_ACTION_IN_PROGRESS; + // } else { + // mError("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle, + // pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + // return TSDB_CODE_MND_VGROUP_NOT_EXIST; + // } } } @@ -955,7 +977,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle) { static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting - mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p db:%s", pVgroup->vgId, ahandle, pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, send drop vnode msg to dnode:%d, ahandle:%p", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, ahandle); @@ -1117,6 +1139,7 @@ void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) { } mnodeDecVgroupRef(pVgroup); + numOfVgroups++; } sdbFreeIter(pIter); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 8c6d6243eb4f677e9a5436b4449e07df011d2b52..e2f061445571a41ad2e9ce9aa60070cf9a41fcee 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -241,6 +241,7 @@ void taosCloseQset(taos_qset param) { if (param == NULL) return; STaosQset *qset = (STaosQset *)param; +#if 0 // remove all the queues from qset pthread_mutex_lock(&qset->mutex); while (qset->head) { @@ -251,6 +252,7 @@ void taosCloseQset(taos_qset param) { queue->next = NULL; } pthread_mutex_unlock(&qset->mutex); +#endif pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem);