From c2a9c119688735fcaa11a54206f63c449ab820da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 11 May 2020 19:46:33 +0800 Subject: [PATCH] [TD-271] fix bug while drop stable --- src/mnode/inc/mgmtDef.h | 3 +- src/mnode/src/mgmtTable.c | 148 +++++++++++++++++++------------------- 2 files changed, 75 insertions(+), 76 deletions(-) diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index ba71f9373b..3ac2efb83b 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -85,8 +85,7 @@ typedef struct SSuperTableObj { int32_t numOfTables; int16_t nextColId; SSchema * schema; - int32_t vgLen; - int32_t * vgList; + void * vgHash; } SSuperTableObj; typedef struct { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index bfe357cf7c..d001114bf0 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -24,6 +24,7 @@ #include "tname.h" #include "tidpool.h" #include "tglobal.h" +#include "hash.h" #include "dnode.h" #include "mgmtDef.h" #include "mgmtInt.h" @@ -363,39 +364,35 @@ static void mgmtCleanUpChildTables() { } static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { - if (pStable->vgLen == 0) { - pStable->vgLen = 8; - pStable->vgList = calloc(pStable->vgLen, sizeof(int32_t)); - } - - bool find = false; - int32_t pos = 0; - for (pos = 0; pos < pStable->vgLen; ++pos) { - if (pStable->vgList[pos] == 0) break; - if (pStable->vgList[pos] == pCtable->vgId) { - find = true; - break; - } - } + pStable->numOfTables++; - if (!find) { - if (pos >= pStable->vgLen) { - pStable->vgLen *= 2; - pStable->vgList = realloc(pStable->vgList, pStable->vgLen * sizeof(int32_t)); - } - pStable->vgList[pos] = pCtable->vgId; + if (pStable->vgHash == NULL) { + pStable->vgHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); } - pStable->numOfTables++; + if (pStable->vgHash != NULL) { + taosHashPut(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); + } } static void mgmtRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { pStable->numOfTables--; + + if (pStable->vgHash == NULL) return; + + SVgObj *pVgroup = mgmtGetVgroup(pCtable->vgId); + if (pVgroup != NULL) { + taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId)); + } + mgmtDecVgroupRef(pVgroup); } static void mgmtDestroySuperTable(SSuperTableObj *pStable) { + if (pStable->vgHash != NULL) { + taosHashCleanup(pStable->vgHash); + pStable->vgHash = NULL; + } tfree(pStable->schema); - tfree(pStable->vgList) tfree(pStable); } @@ -434,7 +431,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { void *oldSchema = pTable->schema; memcpy(pTable, pNew, pOper->rowSize); pTable->schema = pNew->schema; - free(pNew->vgList); + free(pNew->vgHash); free(pNew); free(oldSchema); } @@ -797,26 +794,26 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (pStable->numOfTables != 0) { - mgmtDropAllChildTablesInStable(pStable); - for (int32_t vg = 0; vg < pStable->vgLen; ++vg) { - int32_t vgId = pStable->vgList[vg]; - if (vgId == 0) break; - - SVgObj *pVgroup = mgmtGetVgroup(vgId); + SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); + while (taosHashIterNext(pIter)) { + int32_t *pVgId = taosHashIterGet(pIter); + SVgObj *pVgroup = mgmtGetVgroup(*pVgId); if (pVgroup == NULL) break; - + SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg)); pDrop->contLen = htonl(sizeof(SMDDropSTableMsg)); - pDrop->vgId = htonl(vgId); + pDrop->vgId = htonl(pVgroup->vgId); pDrop->uid = htobe64(pStable->uid); mgmtExtractTableName(pStable->info.tableId, pDrop->tableId); - mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, vgId); + mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, pVgroup->vgId); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; dnodeSendMsgToDnode(&ipSet, &rpcMsg); mgmtDecVgroupRef(pVgroup); } + + mgmtDropAllChildTablesInStable(pStable); } SSdbOper oper = { @@ -1243,59 +1240,62 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { SCMSTableVgroupMsg *pInfo = pMsg->pCont; int32_t numOfTable = htonl(pInfo->numOfTables); - - char* name = (char*) pInfo + sizeof(struct SCMSTableVgroupMsg); + SCMSTableVgroupRspMsg *pRsp = NULL; - - // todo set the initial size to be 10, fix me - int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + (sizeof(SCMVgroupInfo) * 10 + sizeof(SVgroupsInfo))*numOfTable; - + int32_t contLen = sizeof(SCMSTableVgroupRspMsg); + for (int32_t i = 0; i < numOfTable; ++i) { + char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i; + SSuperTableObj *pTable = mgmtGetSuperTable(stableName); + if (pTable != NULL) { + stableName = (char*)pTable; //hack way + } + + if (pTable->vgHash != NULL) { + contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo)); + } + mgmtDecTableRef(pTable); + } + pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } - + pRsp->numOfTables = htonl(numOfTable); - char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg); - - for(int32_t i = 0; i < numOfTable; ++i) { - SSuperTableObj *pTable = mgmtGetSuperTable(name); - - pMsg->pTable = (STableObj *)pTable; - if (pMsg->pTable == NULL) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); - return; - } - - SVgroupsInfo* pVgroup = (SVgroupsInfo*) msg; - - int32_t vg = 0; - for (; vg < pTable->vgLen; ++vg) { - int32_t vgId = pTable->vgList[vg]; - if (vgId == 0) break; - - SVgObj *vgItem = mgmtGetVgroup(vgId); - if (vgItem == NULL) break; - - pVgroup->vgroups[vg].vgId = htonl(vgId); - for (int32_t vn = 0; vn < vgItem->numOfVnodes; ++vn) { - SDnodeObj *pDnode = vgItem->vnodeGid[vn].pDnode; + char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg); + + for (int32_t i = 0; i < numOfTable; ++i) { + SSuperTableObj *pTable = (SSuperTableObj *)((char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i); + SVgroupsInfo * pVgroup = (SVgroupsInfo *)msg; + + SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash); + int32_t vgSize = 0; + while (taosHashIterNext(pIter)) { + int32_t *pVgId = taosHashIterGet(pIter); + SVgObj * pVgItem = mgmtGetVgroup(*pVgId + ); + if (pVgItem == NULL) continue; + + pVgroup->vgroups[vgSize].vgId = htonl(pVgItem->vgId); + for (int32_t vn = 0; vn < pVgItem->numOfVnodes; ++vn) { + SDnodeObj *pDnode = pVgItem->vnodeGid[vn].pDnode; if (pDnode == NULL) break; - - strncpy(pVgroup->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn)); - pVgroup->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort); - - pVgroup->vgroups[vg].numOfIps++; + + strncpy(pVgroup->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn)); + pVgroup->vgroups[vgSize].ipAddr[vn].port = htons(tsDnodeShellPort); + + pVgroup->vgroups[vgSize].numOfIps++; } - - mgmtDecVgroupRef(vgItem); + + vgSize++; + mgmtDecVgroupRef(pVgItem); } - - pVgroup->numOfVgroups = htonl(vg); - + + pVgroup->numOfVgroups = htonl(vgSize); + // one table is done, try the next table - msg += sizeof(SVgroupsInfo) + vg * sizeof(SCMVgroupInfo); + msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo); } SRpcMsg rpcRsp = {0}; -- GitLab