提交 2a41cd21 编写于 作者: S slguan

[TD-93] add ref to sdb

上级 75b15760
......@@ -252,7 +252,6 @@ typedef struct {
void *pCont;
SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableInfo *pTable;
} SQueuedMsg;
......
......@@ -27,8 +27,6 @@ extern "C" {
int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
void mgmtIncVgroupRef(SVgObj *pVgroup);
void mgmtDecVgroupRef(SVgObj *pVgroup);
void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
......
......@@ -27,6 +27,7 @@
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
......@@ -36,7 +37,7 @@ static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(void *handle, void *tmrId);
static void mgmtDropDb(SQueuedMsg *newMsg);
static int32_t mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -651,6 +652,7 @@ void mgmtRemoveSuperTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, -1);
mgmtDecDbRef(pDb);
}
void mgmtAddTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, 1);
mgmtIncDbRef(pDb);
......@@ -769,8 +771,6 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
}
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMAlterDbMsg *pAlter = pMsg->pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
......@@ -780,13 +780,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to alter, no rights", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pAlter->db);
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pAlter->db);
if (pDb == NULL) {
mError("db:%s, failed to alter, invalid db", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
......@@ -797,16 +791,13 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, code);
mgmtDecDbRef(pDb);
return;
}
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId);
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
memset(pMsg, 0, sizeof(SQueuedMsg));
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg);
......@@ -814,15 +805,11 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
}
mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
rpcFreeCont(pMsg->pCont);
mgmtDecDbRef(pDb);
}
static void mgmtDropDb(void *handle, void *tmrId) {
SQueuedMsg *newMsg = handle;
SDbObj *pDb = newMsg->ahandle;
static void mgmtDropDb(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
mPrint("db:%s, drop db from sdb", pDb->name);
SSdbOperDesc oper = {
......@@ -835,14 +822,10 @@ static void mgmtDropDb(void *handle, void *tmrId) {
code = TSDB_CODE_SDB_ERROR;
}
mgmtSendSimpleResp(newMsg->thandle, code);
rpcFreeCont(newMsg->pCont);
free(newMsg);
mgmtSendSimpleResp(pMsg->thandle, code);
}
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
......@@ -852,13 +835,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to drop, no rights", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pDrop->db);
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pDrop->db);
if (pDb == NULL) {
if (pDrop->ignoreNotExists) {
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
......@@ -874,7 +851,6 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("db:%s, can't drop monitor database", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
mgmtDecDbRef(pDb);
return;
}
......@@ -882,17 +858,13 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
mgmtSendSimpleResp(pMsg->thandle, code);
mgmtDecDbRef(pDb);
return;
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg);
......@@ -900,10 +872,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
}
mTrace("db:%s, all vgroups is dropped", pDb->name);
void *tmpTmr;
newMsg->ahandle = pDb;
taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr);
mgmtDropDb(pMsg);
}
void mgmtDropAllDbs(SAcctObj *pAcct) {
......
......@@ -215,7 +215,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
}
mgmtDecVgroupRef(pVgroup);
}
if (pDnode->status != TSDB_DN_STATUS_READY) {
......
......@@ -789,7 +789,6 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
free(pMsg);
}
......
......@@ -433,21 +433,25 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) {
void sdbIncRef(void *handle, void *pRow) {
if (pRow) {
SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1);
sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
if (pTable->refCountPos > 0) {
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1);
sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
}
}
}
void sdbDecRef(void *handle, void *pRow) {
if (pRow) {
SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
if (refCount <= 0) {
SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper);
if (pTable->refCountPos > 0) {
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
if (refCount <= 0) {
SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper);
}
}
}
}
......
......@@ -1355,21 +1355,21 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
pMsg->pVgroup = mgmtGetAvailableVgroup(pMsg->pDb);
if (pMsg->pVgroup == NULL) {
SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) {
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
return;
}
int32_t sid = taosAllocateId(pMsg->pVgroup->idPool);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("tables:%s, no enough sid in vgroup:%d", pMsg->pVgroup->vgId);
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
return;
}
pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pMsg->pVgroup, sid);
pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -1381,7 +1381,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable;
SRpcMsg rpcMsg = {
......@@ -1397,8 +1397,8 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
return;
......@@ -1417,7 +1417,7 @@ void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
pDrop->sid = htonl(pTable->sid);
pDrop->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mTrace("table:%s, send drop ctable msg", pDrop->tableId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
......@@ -1817,8 +1817,8 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
return;
}
queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (queueMsg->pVgroup == NULL) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to get vgroup", pTable->info.tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
return;
......@@ -1837,9 +1837,9 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
return;
}
if (queueMsg->pVgroup->numOfTables <= 0) {
mPrint("vgroup:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId);
mgmtDropVgroup(queueMsg->pVgroup, NULL);
if (pVgroup->numOfTables <= 0) {
mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId);
mgmtDropVgroup(pVgroup, NULL);
}
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
......
......@@ -149,7 +149,6 @@ int32_t mgmtInitVgroups() {
.tableName = "vgroups",
.hashSessions = TSDB_MAX_VGROUPS,
.maxRowSize = tsVgUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_AUTO,
.insertFp = mgmtVgroupActionInsert,
.deleteFp = mgmtVgroupActionDelete,
......@@ -175,14 +174,6 @@ int32_t mgmtInitVgroups() {
return 0;
}
void mgmtIncVgroupRef(SVgObj *pVgroup) {
return sdbIncRef(tsVgroupSdb, pVgroup);
}
void mgmtDecVgroupRef(SVgObj *pVgroup) {
return sdbDecRef(tsVgroupSdb, pVgroup);
}
SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
}
......@@ -436,7 +427,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables++;
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册