提交 ff9bdcfd 编写于 作者: S Shengliang Guan

[TD-149] replace sdb hash

上级 93260417
......@@ -25,7 +25,7 @@ extern "C" {
int32_t mgmtInitAccts();
void mgmtCleanUpAccts();
void * mgmtGetAcct(char *acctName);
void * mgmtGetNextAcct(void *pNode, SAcctObj **pAcct);
void * mgmtGetNextAcct(void *pIter, SAcctObj **pAcct);
void mgmtIncAcctRef(SAcctObj *pAcct);
void mgmtDecAcctRef(SAcctObj *pAcct);
void mgmtAddDbToAcct(SAcctObj *pAcct, SDbObj *pDb);
......
......@@ -32,7 +32,7 @@ int32_t mgmtInitDbs();
void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db);
void * mgmtGetNextDb(void *pNode, SDbObj **pDb);
void * mgmtGetNextDb(void *pIter, SDbObj **pDb);
void mgmtIncDbRef(SDbObj *pDb);
void mgmtDecDbRef(SDbObj *pDb);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
......
......@@ -223,7 +223,7 @@ typedef struct SAcctObj {
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN + 1];
void * pNode;
void * pIter;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
......
......@@ -34,7 +34,7 @@ char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
void mgmtMonitorDnodeModule();
int32_t mgmtGetDnodesNum();
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
void * mgmtGetNextDnode(void *pIter, SDnodeObj **pDnode);
void mgmtIncDnodeRef(SDnodeObj *pDnode);
void mgmtDecDnodeRef(SDnodeObj *pDnode);
void * mgmtGetDnode(int32_t dnodeId);
......
......@@ -37,7 +37,7 @@ void mgmtDropMnodeLocal(int32_t dnodeId);
void * mgmtGetMnode(int32_t mnodeId);
int32_t mgmtGetMnodesNum();
void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
void * mgmtGetNextMnode(void *pIter, struct SMnodeObj **pMnode);
void mgmtIncMnodeRef(struct SMnodeObj *pMnode);
void mgmtDecMnodeRef(struct SMnodeObj *pMnode);
......
......@@ -80,7 +80,8 @@ int32_t sdbDeleteRow(SSdbOper *pOper);
int32_t sdbUpdateRow(SSdbOper *pOper);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
void *sdbFetchRow(void *handle, void *pIter, void **ppRow);
void sdbFreeIter(void *pIter);
void sdbIncRef(void *thandle, void *pRow);
void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle);
......
......@@ -27,8 +27,8 @@ void mgmtCleanUpTables();
void * mgmtGetTable(char *tableId);
void mgmtIncTableRef(void *pTable);
void mgmtDecTableRef(void *pTable);
void * mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable);
void * mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable);
void * mgmtGetNextChildTable(void *pIter, SChildTableObj **pTable);
void * mgmtGetNextSuperTable(void *pIter, SSuperTableObj **pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
......
......@@ -24,7 +24,7 @@ extern "C" {
int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name);
void * mgmtGetNextUser(void *pNode, SUserObj **pUser);
void * mgmtGetNextUser(void *pIter, SUserObj **pUser);
void mgmtIncUserRef(SUserObj *pUser);
void mgmtDecUserRef(SUserObj *pUser);
SUserObj *mgmtGetUserFromConn(void *pConn);
......
......@@ -35,7 +35,7 @@ void mgmtDecVgroupRef(SVgObj *pVgroup);
void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg);
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode);
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
void * mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload);
......
......@@ -126,8 +126,8 @@ void *mgmtGetAcct(char *name) {
return sdbGetRow(tsAcctSdb, name);
}
void *mgmtGetNextAcct(void *pNode, SAcctObj **pAcct) {
return sdbFetchRow(tsAcctSdb, pNode, (void **)pAcct);
void *mgmtGetNextAcct(void *pIter, SAcctObj **pAcct) {
return sdbFetchRow(tsAcctSdb, pIter, (void **)pAcct);
}
void mgmtIncAcctRef(SAcctObj *pAcct) {
......
......@@ -22,6 +22,7 @@
#include "mgmtInt.h"
#include "mgmtMnode.h"
#include "mgmtDnode.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#ifndef _SYNC
......@@ -33,13 +34,13 @@ void balanceUpdateMgmt() {}
void balanceReset() {}
int32_t balanceAllocVnodes(SVgObj *pVgroup) {
void * pNode = NULL;
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL;
float vnodeUsage = 1000.0;
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
pIter = mgmtGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->totalVnodes > 0 && pDnode->openVnodes < pDnode->totalVnodes) {
......@@ -55,6 +56,8 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
mgmtDecDnodeRef(pDnode);
}
sdbFreeIter(pIter);
if (pSelDnode == NULL) {
mError("failed to alloc vnode to vgroup");
return TSDB_CODE_NO_ENOUGH_DNODES;
......
......@@ -156,8 +156,8 @@ int32_t mgmtInitDbs() {
return 0;
}
void *mgmtGetNextDb(void *pNode, SDbObj **pDb) {
return sdbFetchRow(tsDbSdb, pNode, (void **)pDb);
void *mgmtGetNextDb(void *pIter, SDbObj **pDb) {
return sdbFetchRow(tsDbSdb, pIter, (void **)pDb);
}
SDbObj *mgmtGetDb(char *db) {
......@@ -583,7 +583,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
if (pUser == NULL) return 0;
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDb(pShow->pNode, &pDb);
pShow->pIter = mgmtGetNextDb(pShow->pIter, &pDb);
if (pDb == NULL) break;
cols = 0;
......@@ -865,14 +865,15 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
}
}
void *pNode = NULL;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
pIter = mgmtGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
mgmtSendCreateVgroupMsg(pVgroup, NULL);
mgmtDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
if (oldReplica != pDb->cfg.replications) {
balanceNotify();
......@@ -983,12 +984,12 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
void mgmtDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0;
SDbObj *pDb = NULL;
void * pNode = NULL;
void * pIter = NULL;
mPrint("acct:%s, all dbs will be dropped from sdb", pAcct->user);
while (1) {
pNode = mgmtGetNextDb(pNode, &pDb);
pIter = mgmtGetNextDb(pIter, &pDb);
if (pDb == NULL) break;
if (pDb->pAcct == pAcct) {
......@@ -1005,5 +1006,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
mgmtDecDbRef(pDb);
}
sdbFreeIter(pIter);
mPrint("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs);
}
......@@ -170,8 +170,8 @@ void mgmtCleanupDnodes() {
sdbCloseTable(tsDnodeSdb);
}
void *mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) {
return sdbFetchRow(tsDnodeSdb, pNode, (void **)pDnode);
void *mgmtGetNextDnode(void *pIter, SDnodeObj **pDnode) {
return sdbFetchRow(tsDnodeSdb, pIter, (void **)pDnode);
}
int32_t mgmtGetDnodesNum() {
......@@ -184,17 +184,20 @@ void *mgmtGetDnode(int32_t dnodeId) {
void *mgmtGetDnodeByEp(char *ep) {
SDnodeObj *pDnode = NULL;
void * pNode = NULL;
void * pIter = NULL;
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
pIter = mgmtGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (strcmp(ep, pDnode->dnodeEp) == 0) {
sdbFreeIter(pIter);
return pDnode;
}
mgmtDecDnodeRef(pDnode);
}
sdbFreeIter(pIter);
return NULL;
}
......@@ -530,7 +533,7 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->numOfRows = mgmtGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
pShow->pIter = NULL;
mgmtDecUserRef(pUser);
......@@ -544,7 +547,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
char *pWrite;
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDnode(pShow->pNode, &pDnode);
pShow->pIter = mgmtGetNextDnode(pShow->pIter, &pDnode);
if (pDnode == NULL) break;
cols = 0;
......@@ -636,7 +639,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->numOfRows = mgmtGetDnodesNum() * TSDB_MOD_MAX;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
pShow->pIter = NULL;
mgmtDecUserRef(pUser);
return 0;
......@@ -648,7 +651,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
while (numOfRows < rows) {
SDnodeObj *pDnode = NULL;
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
pShow->pIter = mgmtGetNextDnode(pShow->pIter, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
......@@ -738,7 +741,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
pShow->pIter = NULL;
mgmtDecUserRef(pUser);
return 0;
......@@ -821,7 +824,8 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
if (pShow->payloadLen > 0 ) {
pDnode = mgmtGetDnodeByEp(pShow->payload);
} else {
mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
void *pIter = mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
sdbFreeIter(pIter);
}
if (pDnode != NULL) {
......@@ -830,7 +834,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = pDnode;
pShow->pIter = pDnode;
mgmtDecUserRef(pUser);
return 0;
......@@ -844,12 +848,12 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
if (0 == rows) return 0;
pDnode = (SDnodeObj *)(pShow->pNode);
pDnode = (SDnodeObj *)(pShow->pIter);
if (pDnode != NULL) {
void *pNode = NULL;
void *pIter = NULL;
SVgObj *pVgroup;
while (1) {
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
pIter = mgmtGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
......@@ -869,6 +873,7 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
mgmtDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
} else {
numOfRows = 0;
}
......
......@@ -95,11 +95,12 @@ static int32_t mgmtMnodeActionDecode(SSdbOper *pOper) {
static int32_t mgmtMnodeActionRestored() {
if (mgmtGetMnodesNum() == 1) {
SMnodeObj *pMnode = NULL;
mgmtGetNextMnode(NULL, &pMnode);
void *pIter = mgmtGetNextMnode(NULL, &pMnode);
if (pMnode != NULL) {
pMnode->role = TAOS_SYNC_ROLE_MASTER;
mgmtDecMnodeRef(pMnode);
}
sdbFreeIter(pIter);
}
return TSDB_CODE_SUCCESS;
}
......@@ -157,8 +158,8 @@ void mgmtDecMnodeRef(SMnodeObj *pMnode) {
sdbDecRef(tsMnodeSdb, pMnode);
}
void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
return sdbFetchRow(tsMnodeSdb, pNode, (void **)pMnode);
void *mgmtGetNextMnode(void *pIter, SMnodeObj **pMnode) {
return sdbFetchRow(tsMnodeSdb, pIter, (void **)pMnode);
}
char *mgmtGetMnodeRoleStr(int32_t role) {
......@@ -177,10 +178,10 @@ char *mgmtGetMnodeRoleStr(int32_t role) {
}
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) {
void *pNode = NULL;
void *pIter = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
pIter = mgmtGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn);
......@@ -194,6 +195,7 @@ void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) {
mgmtDecMnodeRef(pMnode);
}
sdbFreeIter(pIter);
}
void mgmtGetMnodeInfos(void *param) {
......@@ -201,10 +203,10 @@ void mgmtGetMnodeInfos(void *param) {
mnodes->inUse = 0;
int32_t index = 0;
void *pNode = NULL;
void *pIter = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
pIter = mgmtGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
......@@ -216,6 +218,7 @@ void mgmtGetMnodeInfos(void *param) {
index++;
mgmtDecMnodeRef(pMnode);
}
sdbFreeIter(pIter);
mnodes->nodeNum = index;
}
......@@ -317,7 +320,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
pShow->pIter = NULL;
mgmtDecUserRef(pUser);
return 0;
......@@ -330,7 +333,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
char *pWrite;
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow->pNode, &pMnode);
pShow->pIter = mgmtGetNextMnode(pShow->pIter, &pMnode);
if (pMnode == NULL) break;
cols = 0;
......
......@@ -140,7 +140,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) {
//
// // sorting based on useconds
//
// pShow->pNode = pQueryShow;
// pShow->pIter = pQueryShow;
return 0;
}
......@@ -187,7 +187,7 @@ int32_t mgmtGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
pShow->pIter = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtGetQueries(pShow, pConn);
......@@ -252,7 +252,7 @@ int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pCo
char *pWrite;
int32_t cols = 0;
SQueryShow *pQueryShow = (SQueryShow *)pShow->pNode;
SQueryShow *pQueryShow = (SQueryShow *)pShow->pIter;
if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index;
......@@ -339,7 +339,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) {
//
// // sorting based on useconds
//
// pShow->pNode = pStreamShow;
// pShow->pIter = pStreamShow;
return 0;
}
......@@ -397,7 +397,7 @@ int32_t mgmtGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
pShow->pIter = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtGetStreams(pShow, pConn);
......@@ -409,7 +409,7 @@ int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pCo
char *pWrite;
int32_t cols = 0;
SStreamShow *pStreamShow = (SStreamShow *)pShow->pNode;
SStreamShow *pStreamShow = (SStreamShow *)pShow->pIter;
if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index;
......@@ -592,7 +592,7 @@ int mgmtGetConns(SShowObj *pShow, void *pConn) {
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
// pShow->pIter = pConnShow;
return 0;
}
......@@ -627,7 +627,7 @@ int32_t mgmtGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
pShow->pIter = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtGetConns(pShow, pConn);
......@@ -639,7 +639,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
char *pWrite;
int32_t cols = 0;
SConnShow *pConnShow = (SConnShow *)pShow->pNode;
SConnShow *pConnShow = (SConnShow *)pShow->pIter;
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
......
......@@ -239,10 +239,10 @@ void sdbUpdateSync() {
}
if (index == 0) {
void *pNode = NULL;
void *pIter = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
pIter = mgmtGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;
......@@ -252,6 +252,7 @@ void sdbUpdateSync() {
mgmtDecMnodeRef(pMnode);
}
sdbFreeIter(pIter);
}
syncCfg.replica = index;
......@@ -666,7 +667,10 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
}
SSdbRow *pMeta = taosHashIterGet(pIter);
if (pMeta == NULL) return NULL;
if (pMeta == NULL) {
taosHashDestroyIter(pIter);
return NULL;
}
*ppRow = pMeta->row;
sdbIncRef(handle, pMeta->row);
......@@ -674,6 +678,12 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
return pIter;
}
void sdbFreeIter(void *pIter) {
if (pIter != NULL) {
taosHashDestroyIter(pIter);
}
}
void *sdbOpenTable(SSdbTableDesc *pDesc) {
SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
......
......@@ -48,6 +48,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
static void mgmtFreeShowObj(void *data);
void *tsMgmtTmr;
static void *tsMgmtTranQhandle = NULL;
......@@ -65,7 +66,7 @@ int32_t mgmtInitShell() {
tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND");
tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT");
tsQhandleCache = taosCacheInit(tsMgmtTmr, 10);
tsQhandleCache = taosCacheInitWithCb(tsMgmtTmr, 10, mgmtFreeShowObj);
return 0;
}
......@@ -479,7 +480,7 @@ bool mgmtCheckQhandle(uint64_t qhandle) {
mTrace("qhandle:%p is retrived", qhandle);
return true;
} else {
mTrace("qhandle:%p is already freed", qhandle);
mTrace("qhandle:%p is already released", qhandle);
return false;
}
}
......@@ -498,8 +499,14 @@ void* mgmtSaveQhandle(void *qhandle, int32_t size) {
return NULL;
}
static void mgmtFreeShowObj(void *data) {
SShowObj *pShow = data;
//sdbFreeIter(pShow->pIter);
mTrace("qhandle:%p is destroyed", pShow);
}
void mgmtFreeQhandle(void *qhandle, bool forceRemove) {
mTrace("qhandle:%p is freed", qhandle);
mTrace("qhandle:%p is released", qhandle);
taosCacheRelease(tsQhandleCache, &qhandle, forceRemove);
}
......
......@@ -247,25 +247,19 @@ static int32_t mgmtChildTableActionDecode(SSdbOper *pOper) {
}
static int32_t mgmtChildTableActionRestored() {
void *pNode = NULL;
void *pLastNode = NULL;
void *pIter = NULL;
SChildTableObj *pTable = NULL;
while (1) {
pLastNode = pNode;
mgmtDecTableRef(pTable);
pNode = mgmtGetNextChildTable(pNode, &pTable);
pIter = mgmtGetNextChildTable(pIter, &pTable);
if (pTable == NULL) break;
SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId);
if (pDb == NULL) {
mError("ctable:%s, failed to get db, discard it", pTable->info.tableId);
SSdbOper desc = {0};
desc.type = SDB_OPER_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
mgmtDecDbRef(pDb);
......@@ -274,12 +268,8 @@ static int32_t mgmtChildTableActionRestored() {
if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgId:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
SSdbOper desc = {0};
desc.type = SDB_OPER_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
mgmtDecVgroupRef(pVgroup);
......@@ -288,24 +278,16 @@ static int32_t mgmtChildTableActionRestored() {
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
SSdbOper desc = {0};
desc.type = SDB_OPER_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
if (pVgroup->tableList == NULL) {
mError("ctable:%s, vgId:%d tableList is null", pTable->info.tableId, pTable->vgId);
pTable->vgId = 0;
SSdbOper desc = {0};
desc.type = SDB_OPER_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -314,18 +296,16 @@ static int32_t mgmtChildTableActionRestored() {
if (pSuperTable == NULL) {
mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid);
pTable->vgId = 0;
SSdbOper desc = {0};
desc.type = SDB_OPER_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
mgmtDecTableRef(pSuperTable);
}
}
sdbFreeIter(pIter);
return 0;
}
......@@ -560,10 +540,10 @@ static void *mgmtGetSuperTable(char *tableId) {
static void *mgmtGetSuperTableByUid(uint64_t uid) {
SSuperTableObj *pStable = NULL;
void * pNode = NULL;
void *pIter = NULL;
while (1) {
pNode = mgmtGetNextSuperTable(pNode, &pStable);
pIter = mgmtGetNextSuperTable(pIter, &pStable);
if (pStable == NULL) break;
if (pStable->uid == uid) {
return pStable;
......@@ -571,6 +551,8 @@ static void *mgmtGetSuperTableByUid(uint64_t uid) {
mgmtDecTableRef(pStable);
}
sdbFreeIter(pIter);
return NULL;
}
......@@ -588,12 +570,12 @@ void *mgmtGetTable(char *tableId) {
return NULL;
}
void *mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable) {
return sdbFetchRow(tsChildTableSdb, pNode, (void **)pTable);
void *mgmtGetNextChildTable(void *pIter, SChildTableObj **pTable) {
return sdbFetchRow(tsChildTableSdb, pIter, (void **)pTable);
}
void *mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable) {
return sdbFetchRow(tsSuperTableSdb, pNode, (void **)pTable);
void *mgmtGetNextSuperTable(void *pIter, SSuperTableObj **pTable) {
return sdbFetchRow(tsSuperTableSdb, pIter, (void **)pTable);
}
void mgmtIncTableRef(void *p1) {
......@@ -1122,7 +1104,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
while (numOfRows < rows) {
mgmtDecTableRef(pTable);
pShow->pNode = mgmtGetNextSuperTable(pShow->pNode, &pTable);
pShow->pIter = mgmtGetNextSuperTable(pShow->pIter, &pTable);
if (pTable == NULL) break;
if (strncmp(pTable->info.tableId, prefix, prefixLen)) {
continue;
......@@ -1172,8 +1154,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
}
void mgmtDropAllSuperTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
void * pIter= NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SSuperTableObj *pTable = NULL;
......@@ -1181,8 +1162,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
mPrint("db:%s, all super tables will be dropped from sdb", pDropDb->name);
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextSuperTable(pNode, &pTable);
pIter = mgmtGetNextSuperTable(pIter, &pTable);
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
......@@ -1192,13 +1172,14 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables ++;
}
mgmtDecTableRef(pTable);
}
sdbFreeIter(pIter);
mPrint("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
......@@ -1753,8 +1734,7 @@ static void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
}
void mgmtDropAllChildTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
void * pIter = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SChildTableObj *pTable = NULL;
......@@ -1762,8 +1742,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
mPrint("db:%s, all child tables will be dropped from sdb", pDropDb->name);
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextChildTable(pNode, &pTable);
pIter = mgmtGetNextChildTable(pIter, &pTable);
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
......@@ -1773,26 +1752,25 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables++;
}
mgmtDecTableRef(pTable);
}
sdbFreeIter(pIter);
mPrint("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
void *pNode = NULL;
void *pLastNode = NULL;
void * pIter = NULL;
int32_t numOfTables = 0;
SChildTableObj *pTable = NULL;
mPrint("stable:%s, all child tables will dropped from sdb", pStable->info.tableId, numOfTables);
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextChildTable(pNode, &pTable);
pIter = mgmtGetNextChildTable(pIter, &pTable);
if (pTable == NULL) break;
if (pTable->superTable == pStable) {
......@@ -1802,13 +1780,14 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables++;
}
mgmtDecTableRef(pTable);
}
sdbFreeIter(pIter);
mPrint("stable:%s, all child tables:%d is dropped from sdb", pStable->info.tableId, numOfTables);
}
......@@ -2079,7 +2058,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
while (numOfRows < rows) {
mgmtDecTableRef(pTable);
pShow->pNode = mgmtGetNextChildTable(pShow->pNode, &pTable);
pShow->pIter = mgmtGetNextChildTable(pShow->pIter, &pTable);
if (pTable == NULL) break;
// not belong to current db
......
......@@ -155,8 +155,8 @@ SUserObj *mgmtGetUser(char *name) {
return (SUserObj *)sdbGetRow(tsUserSdb, name);
}
void *mgmtGetNextUser(void *pNode, SUserObj **pUser) {
return sdbFetchRow(tsUserSdb, pNode, (void **)pUser);
void *mgmtGetNextUser(void *pIter, SUserObj **pUser) {
return sdbFetchRow(tsUserSdb, pIter, (void **)pUser);
}
void mgmtIncUserRef(SUserObj *pUser) {
......@@ -300,7 +300,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
char *pWrite;
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextUser(pShow->pNode, &pUser);
pShow->pIter = mgmtGetNextUser(pShow->pIter, &pUser);
if (pUser == NULL) break;
cols = 0;
......@@ -504,15 +504,13 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
}
void mgmtDropAllUsers(SAcctObj *pAcct) {
void * pNode = NULL;
void * pLastNode = NULL;
void * pIter = NULL;
int32_t numOfUsers = 0;
int32_t acctNameLen = strlen(pAcct->user);
SUserObj *pUser = NULL;
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextUser(pNode, &pUser);
pIter = mgmtGetNextUser(pIter, &pUser);
if (pUser == NULL) break;
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
......@@ -522,13 +520,14 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
.pObj = pUser,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfUsers++;
}
mgmtDecUserRef(pUser);
}
sdbFreeIter(pIter);
mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
}
......
......@@ -288,8 +288,8 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
void *mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pNode, (void **)pVgroup);
void *mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
}
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
......@@ -429,10 +429,10 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
if (NULL == pTable) {
pShow->numOfRows = pDb->numOfVgroups;
pShow->pNode = pDb->pHead;
pShow->pIter = pDb->pHead;
} else {
pShow->numOfRows = 1;
pShow->pNode = pVgroup;
pShow->pIter = pVgroup;
}
mgmtDecDbRef(pDb);
......@@ -457,9 +457,9 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
while (numOfRows < rows) {
pVgroup = (SVgObj *) pShow->pNode;
pVgroup = (SVgObj *) pShow->pIter;
if (pVgroup == NULL) break;
pShow->pNode = (void *) pVgroup->next;
pShow->pIter = (void *) pVgroup->next;
cols = 0;
......@@ -749,14 +749,12 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
}
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
void * pNode = NULL;
void * pLastNode = NULL;
void * pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t numOfVgroups = 0;
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
pIter = mgmtGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
......@@ -766,24 +764,23 @@ void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
.pObj = pVgroup,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfVgroups++;
continue;
}
mgmtDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
}
void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
void *pNode = NULL;
void *pLastNode = NULL;
void * pIter = NULL;
int32_t numOfVgroups = 0;
SVgObj *pVgroup = NULL;
mPrint("db:%s, all vgroups will be dropped from sdb", pDropDb->name);
while (1) {
pLastNode = pNode;
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
pIter = mgmtGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDropDb) {
......@@ -793,7 +790,6 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
.pObj = pVgroup,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfVgroups++;
if (sendMsg) {
......@@ -804,5 +800,7 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
mgmtDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
}
......@@ -67,6 +67,7 @@ typedef struct {
void * pTimer;
SCacheStatis statistics;
SHashObj * pHashTable;
_hash_free_fn_t freeFp;
int numOfElemsInTrash; // number of element in trash
int16_t deleting; // set the deleting flag to stop refreshing ASAP.
T_REF_DECLARE()
......@@ -88,6 +89,7 @@ typedef struct {
* @return
*/
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTimeInSeconds, void (*freeCb)(void *data));
/**
* add data into cache
......
......@@ -149,6 +149,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
}
pElem->pData->signature = 0;
if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data);
free(pElem->pData);
free(pElem);
}
......@@ -211,6 +212,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
uTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pCacheObj->totalSize, size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
......@@ -380,7 +382,7 @@ static void taosCacheRefresh(void *handle, void *tmrId) {
}
}
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb)(void *data)) {
if (tmrCtrl == NULL || refreshTime <= 0) {
return NULL;
}
......@@ -399,8 +401,9 @@ SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
}
// set free cache node callback function for hash table
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
// taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pCacheObj->freeFp = freeCb;
pCacheObj->refreshTime = refreshTime * 1000;
pCacheObj->tmrCtrl = tmrCtrl;
......@@ -419,6 +422,10 @@ SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
return pCacheObj;
}
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
return taosCacheInitWithCb(tmrCtrl, refreshTime, NULL);
}
void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) {
SCacheDataNode *pNode;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册