From 00e18b70d7e18c2aab8394c08ba9e869a66eacf9 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 23 Mar 2020 19:07:57 +0800 Subject: [PATCH] [TD-15] refact code of tidpool --- src/inc/mnode.h | 3 + src/mnode/inc/mgmtDb.h | 10 +- src/mnode/inc/mgmtProfile.h | 2 + src/mnode/inc/mgmtVgroup.h | 5 +- src/mnode/src/mgmtAcct.c | 6 +- src/mnode/src/mgmtDb.c | 39 ++--- src/mnode/src/mgmtDnode.c | 5 - src/mnode/src/mgmtProfile.c | 10 ++ src/mnode/src/mgmtVgroup.c | 339 +++++++++++++++++++----------------- src/util/inc/tidpool.h | 6 +- src/util/src/tidpool.c | 135 ++++++-------- 11 files changed, 281 insertions(+), 279 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 1452e1f53a..cde26dad4a 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -159,6 +159,8 @@ typedef struct { SSchema* schema; } SNormalTableObj; +struct _db_obj; + typedef struct _vg_obj { uint32_t vgId; char dbName[TSDB_DB_NAME_LEN + 1]; @@ -172,6 +174,7 @@ typedef struct _vg_obj { int8_t reserved[14]; int8_t updateEnd[1]; struct _vg_obj *prev, *next; + struct _db_obj *pDb; void * idPool; STableInfo ** tableList; } SVgObj; diff --git a/src/mnode/inc/mgmtDb.h b/src/mnode/inc/mgmtDb.h index 32bb9d9ec6..f2049f836c 100644 --- a/src/mnode/inc/mgmtDb.h +++ b/src/mnode/inc/mgmtDb.h @@ -22,11 +22,11 @@ extern "C" { #include "mnode.h" -int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup); -int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup); -int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup); -int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup); -int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup); +void mgmtAddVgroupIntoDb(SVgObj *pVgroup); +void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup); +void mgmtRemoveVgroupFromDb(SVgObj *pVgroup); +void mgmtMoveVgroupToTail(SVgObj *pVgroup); +void mgmtMoveVgroupToHead(SVgObj *pVgroup); int32_t mgmtInitDbs(); void mgmtCleanUpDbs(); diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 7f9fd9622c..8c2e073c2d 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle); void mgmtSaveQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle); +void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index b7c68b5f80..3379a93f28 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -28,13 +28,12 @@ int32_t mgmtInitVgroups(); void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); +void mgmtDropAllVgroups(SDbObj *pDropDb); void mgmtCreateVgroup(SQueuedMsg *pMsg); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtUpdateVgroup(SVgObj *pVgroup); -void mgmtUpdateVgroupIp(SDnodeObj *pDnode); - -void mgmtSetVgroupIdPool(); +void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index b1ee72386e..0075f53319 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -85,8 +85,10 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { pUser->prev->next = pUser->next; } - if (pUser->next) pUser->next->prev = pUser->prev; - + if (pUser->next) { + pUser->next->prev = pUser->prev; + } + if (pUser->prev == NULL) { pAcct->pUser = pUser->next; } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index a4eeb88d91..e8ab5aaa8c 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -333,7 +333,8 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } -int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { +void mgmtAddVgroupIntoDb(SVgObj *pVgroup) { + SDbObj *pDb = pVgroup->pDb; pVgroup->next = pDb->pHead; pVgroup->prev = NULL; @@ -342,11 +343,10 @@ int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { pDb->pHead = pVgroup; pDb->numOfVgroups++; - - return 0; } -int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) { +void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup) { + SDbObj *pDb = pVgroup->pDb; pVgroup->next = NULL; pVgroup->prev = pDb->pTail; @@ -355,32 +355,25 @@ int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) { pDb->pTail = pVgroup; pDb->numOfVgroups++; - - return 0; } -int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) { +void mgmtRemoveVgroupFromDb(SVgObj *pVgroup) { + SDbObj *pDb = pVgroup->pDb; if (pVgroup->prev) pVgroup->prev->next = pVgroup->next; if (pVgroup->next) pVgroup->next->prev = pVgroup->prev; if (pVgroup->prev == NULL) pDb->pHead = pVgroup->next; if (pVgroup->next == NULL) pDb->pTail = pVgroup->prev; pDb->numOfVgroups--; - - return 0; } -int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) { - mgmtRemoveVgroupFromDb(pDb, pVgroup); - mgmtAddVgroupIntoDbTail(pDb, pVgroup); - - return 0; +void mgmtMoveVgroupToTail(SVgObj *pVgroup) { + mgmtRemoveVgroupFromDb(pVgroup); + mgmtAddVgroupIntoDbTail(pVgroup); } -int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) { - mgmtRemoveVgroupFromDb(pDb, pVgroup); - mgmtAddVgroupIntoDb(pDb, pVgroup); - - return 0; +void mgmtMoveVgroupToHead(SVgObj *pVgroup) { + mgmtRemoveVgroupFromDb(pVgroup); + mgmtAddVgroupIntoDb(pVgroup); } void mgmtCleanUpDbs() { @@ -600,7 +593,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * if (strcmp(pUser->user, "root") == 0) { #endif pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.maxSessions - 1; // table num can be created should minus 1 + *(int32_t *)pWrite = pDb->cfg.maxSessions; // table num can be created should minus 1 cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -613,9 +606,9 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; #ifdef _TD_ARM_32_ - *(int32_t *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions - 1)); + *(int32_t *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions)); #else - *(float *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions - 1)); + *(float *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions)); #endif cols++; @@ -721,7 +714,7 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { SDbCfg newCfg = pDb->cfg; int32_t daysToKeep = htonl(pAlter->daysToKeep); - int32_t maxSessions = htonl(pAlter->maxSessions) + 1; + int32_t maxSessions = htonl(pAlter->maxSessions); int8_t replications = pAlter->replications; terrno = TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index f088ac75a4..0d83e262ae 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -566,11 +566,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mgmtSetDnodeMaxVnodes(pDnode); } - if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) { - mgmtUpdateVgroupIp(pDnode); - //mgmtUpdateMnodeIp(); - } - int32_t openVnodes = htons(pStatus->openVnodes); for (int32_t j = 0; j < openVnodes; ++j) { pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId); diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index b20e66dd87..1b98ac9596 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -762,3 +762,13 @@ int32_t mgmtInitProfile() { void mgmtCleanUpProfile() { } + +void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { + if (pMsg != NULL) { + if (pMsg->pCont != NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + free(pMsg); + } +} \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 5b1968259f..b37b363077 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -23,62 +23,90 @@ #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtDnode.h" +#include "mgmtDServer.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtTable.h" #include "mgmtVgroup.h" -static void *tsVgroupSdb = NULL; +static void *tsVgroupSdb = NULL; static int32_t tsVgUpdateSize = 0; static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); +static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ; static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); -static int32_t mgmtVgroupActionDestroy(void *pObj) { - SVgObj *pVgroup = (SVgObj *) pObj; +static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) { + SVgObj *pVgroup = pOper->pObj; if (pVgroup->idPool) { taosIdPoolCleanUp(pVgroup->idPool); pVgroup->idPool = NULL; } - if (pVgroup->tableList) tfree(pVgroup->tableList); - tfree(pObj); + if (pVgroup->tableList) { + tfree(pVgroup->tableList); + } + tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionInsert(void *pObj) { - SVgObj *pVgroup = pObj; +static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { + SVgObj *pVgroup = pOper->pObj; + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + return TSDB_CODE_INVALID_DB; + } + + pVgroup->pDb = pDb; + pVgroup->prev = NULL; + pVgroup->next = NULL; + + int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; + pVgroup->tableList = (STableInfo **)calloc(pDb->cfg.maxSessions, sizeof(STableInfo *)); + if (pVgroup->tableList == NULL) { + mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size); + return -1; + } + + pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); + if (pVgroup->idPool == NULL) { + mError("vgroup:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId); + tfree(pVgroup->tableList); + return -1; + } + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); + pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; + pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; pVgroup->vnodeGid[i].vnode = pVgroup->vgId; } + mgmtAddVgroupIntoDb(pVgroup); + return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionDelete(void *pObj) { - SVgObj *pVgroup = pObj; - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - - if (pDb != NULL) { - mgmtRemoveVgroupFromDb(pDb, pVgroup); +static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { + SVgObj *pVgroup = pOper->pObj; + + if (pVgroup->pDb != NULL) { + mgmtRemoveVgroupFromDb(pVgroup); } - // mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); - tfree(pVgroup->tableList); - return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionUpdate(void *pObj) { - SVgObj *pVgroup = (SVgObj *) pObj; +static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) { + SVgObj *pVgroup = pOper->pObj; int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + SDbObj *pDb = pVgroup->pDb; if (pDb != NULL) { if (pDb->cfg.maxSessions != oldTables) { mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); @@ -88,33 +116,31 @@ static int32_t mgmtVgroupActionUpdate(void *pObj) { } } - mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); + mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxSessions, pVgroup->numOfVnodes); return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SVgObj *pVgroup = (SVgObj *) pObj; - if (maxRowSize < tsVgUpdateSize) { +static int32_t mgmtVgroupActionEncode(SSdbOperDesc *pOper) { + SVgObj *pVgroup = pOper->pObj; + if (pOper->maxRowSize < tsVgUpdateSize) { return -1; } else { - memcpy(pData, pVgroup, tsVgUpdateSize); - return tsVgUpdateSize; + memcpy(pOper->rowData, pVgroup, tsVgUpdateSize); + pOper->rowSize = tsVgUpdateSize; + return TSDB_CODE_SUCCESS; } } -static void *mgmtVgroupActionDecode(void *pObj) { - SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj)); - if (pVgroup == NULL) return NULL; - memset(pVgroup, 0, sizeof(SVgObj)); - memcpy(pVgroup, pObj, tsVgUpdateSize); +static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { + SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj)); + if (pVgroup == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; - return pVgroup; + memcpy(pVgroup, pOper->rowData, tsVgUpdateSize); + pOper->pObj = pVgroup; + return TSDB_CODE_SUCCESS; } int32_t mgmtInitVgroups() { - void *pNode = NULL; - SVgObj *pVgroup = NULL; - SVgObj tObj; tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; @@ -122,7 +148,7 @@ int32_t mgmtInitVgroups() { .tableName = "vgroups", .hashSessions = TSDB_MAX_VGROUPS, .maxRowSize = tsVgUpdateSize, - .keyType = SDB_KEYTYPE_AUTO, + .keyType = SDB_KEY_TYPE_AUTO, .insertFp = mgmtVgroupActionInsert, .deleteFp = mgmtVgroupActionDelete, .updateFp = mgmtVgroupActionUpdate, @@ -137,47 +163,11 @@ int32_t mgmtInitVgroups() { return -1; } - while (1) { - pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); - if (pVgroup == NULL) break; - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) continue; - - pVgroup->prev = NULL; - pVgroup->next = NULL; - - int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; - pVgroup->tableList = (STableInfo **)malloc(size); - if (pVgroup->tableList == NULL) { - mError("failed to malloc(size:%d) for the tableList of vgroups", size); - return -1; - } - - memset(pVgroup->tableList, 0, size); - - pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); - if (pVgroup->idPool == NULL) { - mError("failed to taosInitIdPool for vgroups"); - free(pVgroup->tableList); - return -1; - } - - taosIdPoolReinit(pVgroup->idPool); - - if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) { - pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); - pVgroup->vnodeGid[0].privateIp = inet_addr(tsPrivateIp); - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL); - } - - // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); - } - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); mTrace("vgroup is initialized"); return 0; @@ -196,32 +186,41 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { if (pDb == NULL) { mError("failed to create vgroup, db not found"); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + mgmtFreeQueuedMsg(pMsg); return; } - SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1); + SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; + pVgroup->createdTime = taosGetTimestampMs(); if (mgmtAllocVnodes(pVgroup) != 0) { mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes); free(pVgroup); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); + mgmtFreeQueuedMsg(pMsg); return; } - pVgroup->createdTime = taosGetTimestampMs(); - pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions); - pVgroup->numOfTables = 0; - pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); - - mgmtAddVgroupIntoDb(pDb, pVgroup); - // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup, + .rowSize = sizeof(SVgObj) + }; - sdbInsertRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); + int32_t code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + tfree(pVgroup); + code = TSDB_CODE_SDB_ERROR; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR); + mgmtFreeQueuedMsg(pMsg); + return; + } mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); + mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); } pMsg->ahandle = pVgroup; @@ -235,28 +234,12 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { } else { mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mgmtSendDropVgroupMsg(pVgroup, NULL); - sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); - } -} - -void mgmtSetVgroupIdPool() { - void * pNode = NULL; - SVgObj *pVgroup = NULL; - SDbObj *pDb; - - while (1) { - pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); - if (pVgroup == NULL || pVgroup->idPool == 0) break; - - taosIdPoolSetFreeList(pVgroup->idPool); - pVgroup->numOfTables = taosIdPoolNumOfUsed(pVgroup->idPool); - - pDb = mgmtGetDb(pVgroup->dbName); - pDb->numOfTables += pVgroup->numOfTables; - if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1) - mgmtAddVgroupIntoDbTail(pDb, pVgroup); - else - mgmtAddVgroupIntoDb(pDb, pVgroup); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup + }; + sdbDeleteRow(&oper); } } @@ -287,7 +270,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->bytes[cols] = 9; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "vgroup_status"); + strcpy(pSchema[cols].name, "vgroup status"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -327,13 +310,13 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->bytes[cols] = 9; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "vnode_status"); + strcpy(pSchema[cols].name, "vnode status"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 16; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "public_ip"); + strcpy(pSchema[cols].name, "public ip"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; } @@ -445,25 +428,36 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo return numOfRows; } -void mgmtUpdateVgroup(SVgObj *pVgroup) { - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_LOCAL); -} - void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { - pVgroup->numOfTables++; - if (pTable->sid >= 0) + if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) { pVgroup->tableList[pTable->sid] = pTable; + taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); + pVgroup->numOfTables++; + pVgroup->pDb->numOfTables++; + } + + if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) + mgmtAddVgroupIntoDbTail(pVgroup); + else + mgmtAddVgroupIntoDb(pVgroup); } void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { - pVgroup->numOfTables--; - if (pTable->sid >= 0) + if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) { pVgroup->tableList[pTable->sid] = NULL; - taosFreeId(pVgroup->idPool, pTable->sid); + taosFreeId(pVgroup->idPool, pTable->sid); + pVgroup->numOfTables--; + pVgroup->pDb->numOfTables--; + } + + if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) + mgmtAddVgroupIntoDbTail(pVgroup); + else + mgmtAddVgroupIntoDb(pVgroup); } SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + SDbObj *pDb = pVgroup->pDb; if (pDb == NULL) return NULL; SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); @@ -487,6 +481,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { + vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); } @@ -576,13 +571,23 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { newMsg->contLen = queueMsg->contLen; newMsg->pCont = rpcMallocCont(newMsg->contLen); memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + queueMsg->pCont = NULL; mgmtAddToShellQueue(newMsg); } else { - sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup + }; + int32_t code = sdbDeleteRow(&oper); + if (code != 0) { + code = TSDB_CODE_SDB_ERROR; + } + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); } - free(queueMsg); + mgmtFreeQueuedMsg(queueMsg); } static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) { @@ -632,7 +637,15 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { if (queueMsg->received != queueMsg->expected) return; - sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup + }; + int32_t code = sdbDeleteRow(&oper); + if (code != 0) { + code = TSDB_CODE_SDB_ERROR; + } SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); newMsg->msgType = queueMsg->msgType; @@ -644,50 +657,64 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); mgmtAddToShellQueue(newMsg); - free(queueMsg); + queueMsg->pCont = NULL; + mgmtFreeQueuedMsg(queueMsg); +} + +static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { + if (mgmtCheckRedirect(rpcMsg->handle)) return; + + SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont; + pCfg->dnode = htonl(pCfg->dnode); + pCfg->vnode = htonl(pCfg->vnode); + + SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); + if (pVgroup == NULL) { + mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); + return; + } + + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL); } -void mgmtUpdateVgroupIp(SDnodeObj *pDnode) { - void * pNode = NULL; +void mgmtDropAllVgroups(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); SVgObj *pVgroup = NULL; + while (1) { pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); if (pVgroup == NULL) break; - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SVnodeGid *vnodeGid = pVgroup->vnodeGid + i; - if (vnodeGid->dnodeId == pDnode->dnodeId) { - mPrint("vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s", - pVgroup->vgId, vnodeGid->dnodeId, pDnode->privateIp, taosIpStr(vnodeGid->privateIp), - pDnode->publicIp, taosIpStr(vnodeGid->publicIp)); - vnodeGid->publicIp = pDnode->publicIp; - vnodeGid->privateIp = pDnode->privateIp; - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL); - } + if (strncmp(pDropDb->name, pVgroup->dbName, dbNameLen) == 0) { + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_LOCAL, + .table = tsVgroupSdb, + .pObj = pVgroup, + }; + sdbDeleteRow(&oper); + pNode = pLastNode; + numOfTables++; + continue; } } + + mTrace("db:%s, all vgroups is dropped from sdb", pDropDb->name, numOfTables); } -//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { -// if (!sdbMaster) { -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); -// return; -// } -// -// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont; -// pCfg->dnode = htonl(pCfg->dnode); -// pCfg->vnode = htonl(pCfg->vnode); -// -// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); -// if (pVgroup == NULL) { -// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0); -// return; -// } -// -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); -// -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); -// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL); -//} -// \ No newline at end of file +void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) { + assert(ahandle != NULL); + + if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) { + // TODO: + // mgmtSendAlterVgroupMsg(pVgroup, NULL); + } else { + mgmtAddToShellQueue(ahandle); + } +} \ No newline at end of file diff --git a/src/util/inc/tidpool.h b/src/util/inc/tidpool.h index 08f9d36467..bf35251631 100644 --- a/src/util/inc/tidpool.h +++ b/src/util/inc/tidpool.h @@ -34,11 +34,7 @@ void taosIdPoolCleanUp(void *handle); int taosIdPoolNumOfUsed(void *handle); -void taosIdPoolReinit(void *handle); - -void taosIdPoolMarkStatus(void *handle, int id, int status); - -void taosIdPoolSetFreeList(void *handle); +void taosIdPoolMarkStatus(void *handle, int id); #ifdef __cplusplus } diff --git a/src/util/src/tidpool.c b/src/util/src/tidpool.c index c50c38aa3c..c3f429ac43 100644 --- a/src/util/src/tidpool.c +++ b/src/util/src/tidpool.c @@ -25,78 +25,74 @@ typedef struct { } id_pool_t; void *taosInitIdPool(int maxId) { - id_pool_t *pIdPool; - int * idList, i; - - if (maxId < 3) maxId = 3; - - pIdPool = (id_pool_t *)malloc(sizeof(id_pool_t)); + id_pool_t *pIdPool = calloc(1, sizeof(id_pool_t)); if (pIdPool == NULL) return NULL; - idList = (int *)malloc(sizeof(int) * (size_t)maxId); - if (idList == NULL) { + pIdPool->freeList = malloc(sizeof(int) * (size_t)maxId); + if (pIdPool->freeList == NULL) { free(pIdPool); return NULL; } - memset(pIdPool, 0, sizeof(id_pool_t)); pIdPool->maxId = maxId; - pIdPool->numOfFree = maxId - 1; + pIdPool->numOfFree = maxId; pIdPool->freeSlot = 0; - pIdPool->freeList = idList; pthread_mutex_init(&pIdPool->mutex, NULL); - for (i = 1; i < maxId; ++i) idList[i - 1] = i; + for (int i = 0; i < maxId; ++i) { + pIdPool->freeList[i] = -1; + } pTrace("pool:%p is setup, maxId:%d", pIdPool, pIdPool->maxId); - return (void *)pIdPool; + return pIdPool; } int taosAllocateId(void *handle) { - id_pool_t *pIdPool; - int id = -1; - if (handle == NULL) return id; - - pIdPool = (id_pool_t *)handle; - - if (pIdPool->maxId < 3) pError("pool:%p is messed up, maxId:%d", pIdPool, pIdPool->maxId); + id_pool_t *pIdPool = handle; + if (handle == NULL) { + return -1; + } - if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex"); + int slot = -1; + pthread_mutex_lock(&pIdPool->mutex); if (pIdPool->numOfFree > 0) { - id = pIdPool->freeList[pIdPool->freeSlot]; - pIdPool->freeSlot = (pIdPool->freeSlot + 1) % pIdPool->maxId; - pIdPool->numOfFree--; + for (int i = 0; i < pIdPool->maxId; ++i) { + slot = (i + pIdPool->freeSlot) % pIdPool->maxId; + if (pIdPool->freeList[slot] == -1) { + pIdPool->freeList[slot] = slot; + pIdPool->freeSlot = slot + 1; + pIdPool->numOfFree--; + break; + } + } } - if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex"); - - return id; + pthread_mutex_unlock(&pIdPool->mutex); + return slot; } void taosFreeId(void *handle, int id) { - id_pool_t *pIdPool; - int slot; + id_pool_t *pIdPool = handle; + if (handle == NULL) return; - pIdPool = (id_pool_t *)handle; - if (pIdPool->freeList == NULL || pIdPool->maxId == 0) return; - if (id <= 0 || id >= pIdPool->maxId) return; - if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex"); + pthread_mutex_lock(&pIdPool->mutex); - slot = (pIdPool->freeSlot + pIdPool->numOfFree) % pIdPool->maxId; - pIdPool->freeList[slot] = id; - pIdPool->numOfFree++; + int slot = id % pIdPool->maxId; + if (pIdPool->freeList[slot] != -1) { + pIdPool->freeList[slot] = -1; + pIdPool->numOfFree++; + } - if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex"); + pthread_mutex_unlock(&pIdPool->mutex); } void taosIdPoolCleanUp(void *handle) { - id_pool_t *pIdPool; + id_pool_t *pIdPool = handle; - if (handle == NULL) return; - pIdPool = (id_pool_t *)handle; + if (pIdPool == NULL) return; pTrace("pool:%p is cleaned", pIdPool); @@ -110,42 +106,21 @@ void taosIdPoolCleanUp(void *handle) { } int taosIdPoolNumOfUsed(void *handle) { - id_pool_t *pIdPool = (id_pool_t *)handle; - - return pIdPool->maxId - pIdPool->numOfFree - 1; + id_pool_t *pIdPool = handle; + return pIdPool->maxId - pIdPool->numOfFree; } -void taosIdPoolReinit(void *handle) { - id_pool_t *pIdPool; +void taosIdPoolMarkStatus(void *handle, int id) { + id_pool_t *pIdPool = handle; + pthread_mutex_lock(&pIdPool->mutex); - pIdPool = (id_pool_t *)handle; - pIdPool->numOfFree = 0; - pIdPool->freeSlot = 0; - - for (int i = 0; i < pIdPool->maxId; ++i) pIdPool->freeList[i] = 0; -} - -void taosIdPoolMarkStatus(void *handle, int id, int status) { - id_pool_t *pIdPool = (id_pool_t *)handle; - - pIdPool->freeList[id] = status; -} - -void taosIdPoolSetFreeList(void *handle) { - id_pool_t *pIdPool; - int pos = 0; - - pIdPool = (id_pool_t *)handle; - pIdPool->numOfFree = 0; - pIdPool->freeSlot = 0; - - for (int i = 1; i < pIdPool->maxId; ++i) { - if (pIdPool->freeList[i] == 0) { - pIdPool->freeList[pos] = i; - pIdPool->numOfFree++; - pos++; - } + int slot = id % pIdPool->maxId; + if (pIdPool->freeList[slot] == -1) { + pIdPool->freeList[slot] = slot; + pIdPool->numOfFree--; } + + pthread_mutex_unlock(&pIdPool->mutex); } int taosUpdateIdPool(id_pool_t *handle, int maxId) { @@ -154,18 +129,18 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) { return -1; } - int *idList, i; - idList = (int *)malloc(sizeof(int) * (size_t)maxId); + int *idList = malloc(sizeof(int) * maxId); if (idList == NULL) { return -1; } - for (i = 1; i < maxId; ++i) { - idList[i - 1] = i; - } - if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex"); + pthread_mutex_lock(&pIdPool->mutex); + + for (int i = 0; i < maxId; ++i) { + idList[i] = -1; + } - memcpy(idList, pIdPool->freeList, sizeof(int) * (size_t)pIdPool->maxId); + memcpy(idList, pIdPool->freeList, sizeof(int) * pIdPool->maxId); pIdPool->numOfFree += (maxId - pIdPool->maxId); pIdPool->maxId = maxId; @@ -173,7 +148,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) { pIdPool->freeList = idList; free(oldIdList); - if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex"); + pthread_mutex_unlock(&pIdPool->mutex); return 0; } -- GitLab