From 2a41cd2136bd668c31c6328177d63cac2128b3d4 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 3 Apr 2020 22:33:57 +0800 Subject: [PATCH] [TD-93] add ref to sdb --- src/inc/mnode.h | 1 - src/mnode/inc/mgmtVgroup.h | 2 -- src/mnode/src/mgmtDb.c | 53 ++++++++----------------------------- src/mnode/src/mgmtDnode.c | 1 - src/mnode/src/mgmtProfile.c | 1 - src/mnode/src/mgmtSdb.c | 22 ++++++++------- src/mnode/src/mgmtTable.c | 28 ++++++++++---------- src/mnode/src/mgmtVgroup.c | 11 +------- 8 files changed, 39 insertions(+), 80 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index ca93bf4661..3375867bab 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -252,7 +252,6 @@ typedef struct { void *pCont; SUserObj *pUser; SDbObj *pDb; - SVgObj *pVgroup; STableInfo *pTable; } SQueuedMsg; diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index d0b1e0de97..7743f0451b 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -27,8 +27,6 @@ extern "C" { int32_t mgmtInitVgroups(); void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); -void mgmtIncVgroupRef(SVgObj *pVgroup); -void mgmtDecVgroupRef(SVgObj *pVgroup); void mgmtDropAllVgroups(SDbObj *pDropDb); void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index ec1ff62862..4678247e69 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -27,6 +27,7 @@ #include "mgmtGrant.h" #include "mgmtShell.h" #include "mgmtMnode.h" +#include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtTable.h" #include "mgmtUser.h" @@ -36,7 +37,7 @@ static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); -static void mgmtDropDb(void *handle, void *tmrId); +static void mgmtDropDb(SQueuedMsg *newMsg); static int32_t mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -651,6 +652,7 @@ void mgmtRemoveSuperTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfSuperTables, -1); mgmtDecDbRef(pDb); } + void mgmtAddTableIntoDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, 1); mgmtIncDbRef(pDb); @@ -769,8 +771,6 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { } static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { - if (mgmtCheckRedirect(pMsg->thandle)) return; - SCMAlterDbMsg *pAlter = pMsg->pCont; mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle); @@ -780,13 +780,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { return; } - if (!pMsg->pUser->writeAuth) { - mError("db:%s, failed to alter, no rights", pAlter->db); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - return; - } - - SDbObj *pDb = mgmtGetDb(pAlter->db); + SDbObj *pDb = pMsg->pDb = mgmtGetDb(pAlter->db); if (pDb == NULL) { mError("db:%s, failed to alter, invalid db", pAlter->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); @@ -797,16 +791,13 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to alter, invalid db option", pAlter->db); mgmtSendSimpleResp(pMsg->thandle, code); - mgmtDecDbRef(pDb); return; } SVgObj *pVgroup = pDb->pHead; if (pVgroup != NULL) { mPrint("vgroup:%d, will be altered", pVgroup->vgId); - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - memset(pMsg, 0, sizeof(SQueuedMsg)); + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pVgroup; newMsg->expected = pVgroup->numOfVnodes; mgmtAlterVgroup(pVgroup, newMsg); @@ -814,15 +805,11 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { } mTrace("db:%s, all vgroups is altered", pDb->name); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); - rpcFreeCont(pMsg->pCont); - mgmtDecDbRef(pDb); } -static void mgmtDropDb(void *handle, void *tmrId) { - SQueuedMsg *newMsg = handle; - SDbObj *pDb = newMsg->ahandle; +static void mgmtDropDb(SQueuedMsg *pMsg) { + SDbObj *pDb = pMsg->pDb; mPrint("db:%s, drop db from sdb", pDb->name); SSdbOperDesc oper = { @@ -835,14 +822,10 @@ static void mgmtDropDb(void *handle, void *tmrId) { code = TSDB_CODE_SDB_ERROR; } - mgmtSendSimpleResp(newMsg->thandle, code); - rpcFreeCont(newMsg->pCont); - free(newMsg); + mgmtSendSimpleResp(pMsg->thandle, code); } static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { - if (mgmtCheckRedirect(pMsg->thandle)) return; - SCMDropDbMsg *pDrop = pMsg->pCont; mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); @@ -852,13 +835,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { return; } - if (!pMsg->pUser->writeAuth) { - mError("db:%s, failed to drop, no rights", pDrop->db); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - return; - } - - SDbObj *pDb = mgmtGetDb(pDrop->db); + SDbObj *pDb = pMsg->pDb = mgmtGetDb(pDrop->db); if (pDb == NULL) { if (pDrop->ignoreNotExists) { mTrace("db:%s, db is not exist, think drop success", pDrop->db); @@ -874,7 +851,6 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { mError("db:%s, can't drop monitor database", pDrop->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); - mgmtDecDbRef(pDb); return; } @@ -882,17 +858,13 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mgmtSendSimpleResp(pMsg->thandle, code); - mgmtDecDbRef(pDb); return; } - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - pMsg->pCont = NULL; - SVgObj *pVgroup = pDb->pHead; if (pVgroup != NULL) { mPrint("vgroup:%d, will be dropped", pVgroup->vgId); + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pVgroup; newMsg->expected = pVgroup->numOfVnodes; mgmtDropVgroup(pVgroup, newMsg); @@ -900,10 +872,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { } mTrace("db:%s, all vgroups is dropped", pDb->name); - - void *tmpTmr; - newMsg->ahandle = pDb; - taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr); + mgmtDropDb(pMsg); } void mgmtDropAllDbs(SAcctObj *pAcct) { diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 65531bd36d..53f0a668b4 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -215,7 +215,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); } - mgmtDecVgroupRef(pVgroup); } if (pDnode->status != TSDB_DN_STATUS_READY) { diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index ae921a0505..1549b5f37c 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -789,7 +789,6 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { rpcFreeCont(pMsg->pCont); if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); - if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); free(pMsg); } diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index d6a1cf5260..b3b64ab664 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -433,21 +433,25 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) { void sdbIncRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; - int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); - atomic_add_fetch_32(pRefCount, 1); - sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); + if (pTable->refCountPos > 0) { + int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); + atomic_add_fetch_32(pRefCount, 1); + sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); + } } } void sdbDecRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; - int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); - int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); - if (refCount <= 0) { - SSdbOperDesc oper = {.pObj = pRow}; - (*pTable->destroyFp)(&oper); + if (pTable->refCountPos > 0) { + int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); + int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); + sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); + if (refCount <= 0) { + SSdbOperDesc oper = {.pObj = pRow}; + (*pTable->destroyFp)(&oper); + } } } } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 0238d829a0..37cdc36a62 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1355,21 +1355,21 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); - if (pMsg->pVgroup == NULL) { + SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); + if (pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); return; } - int32_t sid = taosAllocateId(pMsg->pVgroup->idPool); + int32_t sid = taosAllocateId(pVgroup->idPool); if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pMsg->pVgroup->vgId); + mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); return; } - pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pMsg->pVgroup, sid); + pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1381,7 +1381,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; SRpcMsg rpcMsg = { @@ -1397,8 +1397,8 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; - pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); - if (pMsg->pVgroup == NULL) { + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); return; @@ -1417,7 +1417,7 @@ void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { pDrop->sid = htonl(pTable->sid); pDrop->uid = htobe64(pTable->uid); - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mTrace("table:%s, send drop ctable msg", pDrop->tableId); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); @@ -1817,8 +1817,8 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { return; } - queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId); - if (queueMsg->pVgroup == NULL) { + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { mError("table:%s, failed to get vgroup", pTable->info.tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); return; @@ -1837,9 +1837,9 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { return; } - if (queueMsg->pVgroup->numOfTables <= 0) { - mPrint("vgroup:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId); - mgmtDropVgroup(queueMsg->pVgroup, NULL); + if (pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); + mgmtDropVgroup(pVgroup, NULL); } mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 32dd6e5096..36e8431570 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -149,7 +149,6 @@ int32_t mgmtInitVgroups() { .tableName = "vgroups", .hashSessions = TSDB_MAX_VGROUPS, .maxRowSize = tsVgUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .keyType = SDB_KEY_TYPE_AUTO, .insertFp = mgmtVgroupActionInsert, .deleteFp = mgmtVgroupActionDelete, @@ -175,14 +174,6 @@ int32_t mgmtInitVgroups() { return 0; } -void mgmtIncVgroupRef(SVgObj *pVgroup) { - return sdbIncRef(tsVgroupSdb, pVgroup); -} - -void mgmtDecVgroupRef(SVgObj *pVgroup) { - return sdbDecRef(tsVgroupSdb, pVgroup); -} - SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } @@ -436,7 +427,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); pVgroup->numOfTables++; } - + if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) mgmtAddVgroupIntoDbTail(pVgroup); } -- GitLab