提交 eb6f21bd 编写于 作者: S slguan

[TD-93] add ref to sdb

上级 07fe4def
...@@ -59,6 +59,7 @@ typedef struct { ...@@ -59,6 +59,7 @@ typedef struct {
char mnodeName[TSDB_DNODE_NAME_LEN + 1]; char mnodeName[TSDB_DNODE_NAME_LEN + 1];
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
int syncFd; int syncFd;
void *hbTimer; void *hbTimer;
void *pSync; void *pSync;
...@@ -84,6 +85,7 @@ typedef struct { ...@@ -84,6 +85,7 @@ typedef struct {
char dnodeName[TSDB_DNODE_NAME_LEN + 1]; char dnodeName[TSDB_DNODE_NAME_LEN + 1];
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
SVnodeLoad vload[TSDB_MAX_VNODES]; SVnodeLoad vload[TSDB_MAX_VNODES];
int32_t status; int32_t status;
uint32_t lastReboot; // time stamp for last reboot uint32_t lastReboot; // time stamp for last reboot
...@@ -115,6 +117,7 @@ typedef struct SSuperTableObj { ...@@ -115,6 +117,7 @@ typedef struct SSuperTableObj {
int32_t numOfTags; int32_t numOfTags;
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
int32_t numOfTables; int32_t numOfTables;
int16_t nextColId; int16_t nextColId;
SSchema * schema; SSchema * schema;
...@@ -133,6 +136,7 @@ typedef struct { ...@@ -133,6 +136,7 @@ typedef struct {
int8_t reserved[1]; int8_t reserved[1];
int8_t updateEnd[1]; int8_t updateEnd[1];
int16_t nextColId; //used by normal table int16_t nextColId; //used by normal table
int32_t refCount;
char* sql; //used by normal table char* sql; //used by normal table
SSchema* schema; //used by normal table SSchema* schema; //used by normal table
SSuperTableObj *superTable; SSuperTableObj *superTable;
...@@ -149,6 +153,7 @@ typedef struct _vg_obj { ...@@ -149,6 +153,7 @@ typedef struct _vg_obj {
int8_t lbStatus; int8_t lbStatus;
int8_t reserved[14]; int8_t reserved[14];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
struct _vg_obj *prev, *next; struct _vg_obj *prev, *next;
struct _db_obj *pDb; struct _db_obj *pDb;
int32_t numOfTables; int32_t numOfTables;
...@@ -163,6 +168,7 @@ typedef struct _db_obj { ...@@ -163,6 +168,7 @@ typedef struct _db_obj {
SDbCfg cfg; SDbCfg cfg;
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
struct _db_obj *prev, *next; struct _db_obj *prev, *next;
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
...@@ -181,6 +187,7 @@ typedef struct _user_obj { ...@@ -181,6 +187,7 @@ typedef struct _user_obj {
int8_t writeAuth; int8_t writeAuth;
int8_t reserved[13]; int8_t reserved[13];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
struct _acctObj * pAcct; struct _acctObj * pAcct;
SQqueryList * pQList; // query list SQqueryList * pQList; // query list
SStreamList * pSList; // stream list SStreamList * pSList; // stream list
...@@ -213,6 +220,7 @@ typedef struct _acctObj { ...@@ -213,6 +220,7 @@ typedef struct _acctObj {
int8_t dirty; int8_t dirty;
int8_t reserved[14]; int8_t reserved[14];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount;
SAcctInfo acctInfo; SAcctInfo acctInfo;
SDbObj * pHead; SDbObj * pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
......
...@@ -44,6 +44,7 @@ typedef struct { ...@@ -44,6 +44,7 @@ typedef struct {
char *tableName; char *tableName;
int32_t hashSessions; int32_t hashSessions;
int32_t maxRowSize; int32_t maxRowSize;
int32_t refCountPos;
ESdbKeyType keyType; ESdbKeyType keyType;
int32_t (*insertFp)(SSdbOperDesc *pOper); int32_t (*insertFp)(SSdbOperDesc *pOper);
int32_t (*deleteFp)(SSdbOperDesc *pOper); int32_t (*deleteFp)(SSdbOperDesc *pOper);
......
...@@ -51,7 +51,7 @@ void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { ...@@ -51,7 +51,7 @@ void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
pAcct->acctInfo.numOfDbs++; pAcct->acctInfo.numOfDbs++;
pthread_mutex_unlock(&pAcct->mutex); pthread_mutex_unlock(&pAcct->mutex);
mgmtIncDbRef(pDb); acctIncRef(pAcct);
} }
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
...@@ -71,7 +71,7 @@ void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { ...@@ -71,7 +71,7 @@ void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
pAcct->acctInfo.numOfDbs--; pAcct->acctInfo.numOfDbs--;
pthread_mutex_unlock(&pAcct->mutex); pthread_mutex_unlock(&pAcct->mutex);
mgmtDecDbRef(pDb); acctDecRef(pAcct);
} }
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
...@@ -80,7 +80,7 @@ void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { ...@@ -80,7 +80,7 @@ void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
pUser->pAcct = pAcct; pUser->pAcct = pAcct;
pthread_mutex_unlock(&pAcct->mutex); pthread_mutex_unlock(&pAcct->mutex);
mgmtIncUserRef(pUser); acctIncRef(pAcct);
} }
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
...@@ -89,5 +89,5 @@ void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { ...@@ -89,5 +89,5 @@ void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
pUser->pAcct = NULL; pUser->pAcct = NULL;
pthread_mutex_unlock(&pAcct->mutex); pthread_mutex_unlock(&pAcct->mutex);
mgmtDecUserRef(pUser); acctDecRef(pAcct);
} }
\ No newline at end of file
...@@ -117,6 +117,7 @@ int32_t mgmtInitDbs() { ...@@ -117,6 +117,7 @@ int32_t mgmtInitDbs() {
.tableName = "dbs", .tableName = "dbs",
.hashSessions = TSDB_MAX_DBS, .hashSessions = TSDB_MAX_DBS,
.maxRowSize = tsDbUpdateSize, .maxRowSize = tsDbUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_STRING, .keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtDbActionInsert, .insertFp = mgmtDbActionInsert,
.deleteFp = mgmtDbActionDelete, .deleteFp = mgmtDbActionDelete,
......
...@@ -47,6 +47,7 @@ typedef struct _SSdbTable { ...@@ -47,6 +47,7 @@ typedef struct _SSdbTable {
int32_t tableId; int32_t tableId;
int32_t hashSessions; int32_t hashSessions;
int32_t maxRowSize; int32_t maxRowSize;
int32_t refCountPos;
int32_t autoIndex; int32_t autoIndex;
int32_t fd; int32_t fd;
int64_t numOfRows; int64_t numOfRows;
...@@ -66,7 +67,6 @@ typedef struct { ...@@ -66,7 +67,6 @@ typedef struct {
int64_t version; int64_t version;
int64_t offset; int64_t offset;
int32_t rowSize; int32_t rowSize;
int32_t refCount;
void * row; void * row;
} SRowMeta; } SRowMeta;
...@@ -320,11 +320,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { ...@@ -320,11 +320,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
} }
} else { } else {
if (rowHead->version < 0) { if (rowHead->version < 0) {
SSdbOperDesc oper = {
.table = pTable,
.pObj = pMetaRow
};
sdbDecRef(pTable, pMetaRow);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
pTable->numOfRows--; pTable->numOfRows--;
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read deleted record:%s", sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read deleted record:%s",
...@@ -340,13 +335,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { ...@@ -340,13 +335,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
.rowSize = rowHead->rowSize, .rowSize = rowHead->rowSize,
.pObj = pMetaRow .pObj = pMetaRow
}; };
sdbDecRef(pTable, pMetaRow);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
int32_t code = (*pTable->decodeFp)(&oper); int32_t code = (*pTable->decodeFp)(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
rowMeta.row = oper.pObj; rowMeta.row = oper.pObj;
sdbIncRef(pTable, pMetaRow);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read updated record:%s", sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read updated record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
...@@ -375,6 +368,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { ...@@ -375,6 +368,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
.version = pMeta->version, .version = pMeta->version,
}; };
sdbIncRef(pTable, oper.pObj);
int32_t code = (*pTable->insertFp)(&oper); int32_t code = (*pTable->insertFp)(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to insert record:%s", pTable->tableName, sdbGetkeyStr(pTable, rowHead->data)); sdbError("table:%s, failed to insert record:%s", pTable->tableName, sdbGetkeyStr(pTable, rowHead->data));
...@@ -398,6 +392,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { ...@@ -398,6 +392,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->keyType = pDesc->keyType; pTable->keyType = pDesc->keyType;
pTable->hashSessions = pDesc->hashSessions; pTable->hashSessions = pDesc->hashSessions;
pTable->maxRowSize = pDesc->maxRowSize; pTable->maxRowSize = pDesc->maxRowSize;
pTable->refCountPos = pDesc->refCountPos;
pTable->insertFp = pDesc->insertFp; pTable->insertFp = pDesc->insertFp;
pTable->deleteFp = pDesc->deleteFp; pTable->deleteFp = pDesc->deleteFp;
pTable->updateFp = pDesc->updateFp; pTable->updateFp = pDesc->updateFp;
...@@ -438,18 +433,18 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) { ...@@ -438,18 +433,18 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) {
void sdbIncRef(void *handle, void *pRow) { void sdbIncRef(void *handle, void *pRow) {
if (pRow) { if (pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
SRowMeta *pMeta = (pRow - 4); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(&pMeta->refCount, 1); atomic_add_fetch_32(pRefCount, 1);
sdbTrace("table:%s, add ref:%d to record:%s", pTable->tableName, pMeta->refCount, sdbGetkeyStr(pTable, pRow)); sdbTrace("table:%s, add ref:%d to record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow));
} }
} }
void sdbDecRef(void *handle, void *pRow) { void sdbDecRef(void *handle, void *pRow) {
if (pRow) { if (pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
SRowMeta * pMeta = (pRow - 4); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(&pMeta->refCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("table:%s, def ref:%d from record:%s", pTable->tableName, pMeta->refCount, sdbGetkeyStr(pTable, pRow)); sdbTrace("table:%s, def ref:%d from record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow));
if (refCount <= 0) { if (refCount <= 0) {
SSdbOperDesc oper = {.pObj = pRow}; SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper); (*pTable->destroyFp)(&oper);
......
...@@ -236,6 +236,7 @@ static int32_t mgmtInitChildTables() { ...@@ -236,6 +236,7 @@ static int32_t mgmtInitChildTables() {
.tableName = "ctables", .tableName = "ctables",
.hashSessions = tsMaxTables, .hashSessions = tsMaxTables,
.maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, .maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_STRING, .keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtChildTableActionInsert, .insertFp = mgmtChildTableActionInsert,
.deleteFp = mgmtChildTableActionDelete, .deleteFp = mgmtChildTableActionDelete,
...@@ -411,6 +412,7 @@ static int32_t mgmtInitSuperTables() { ...@@ -411,6 +412,7 @@ static int32_t mgmtInitSuperTables() {
.tableName = "stables", .tableName = "stables",
.hashSessions = TSDB_MAX_SUPER_TABLES, .hashSessions = TSDB_MAX_SUPER_TABLES,
.maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, .maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_STRING, .keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtSuperTableActionInsert, .insertFp = mgmtSuperTableActionInsert,
.deleteFp = mgmtSuperTableActionDelete, .deleteFp = mgmtSuperTableActionDelete,
......
...@@ -99,6 +99,7 @@ int32_t mgmtInitUsers() { ...@@ -99,6 +99,7 @@ int32_t mgmtInitUsers() {
.tableName = "users", .tableName = "users",
.hashSessions = TSDB_MAX_USERS, .hashSessions = TSDB_MAX_USERS,
.maxRowSize = tsUserUpdateSize, .maxRowSize = tsUserUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_STRING, .keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtUserActionInsert, .insertFp = mgmtUserActionInsert,
.deleteFp = mgmtUserActionDelete, .deleteFp = mgmtUserActionDelete,
......
...@@ -149,6 +149,7 @@ int32_t mgmtInitVgroups() { ...@@ -149,6 +149,7 @@ int32_t mgmtInitVgroups() {
.tableName = "vgroups", .tableName = "vgroups",
.hashSessions = TSDB_MAX_VGROUPS, .hashSessions = TSDB_MAX_VGROUPS,
.maxRowSize = tsVgUpdateSize, .maxRowSize = tsVgUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_AUTO, .keyType = SDB_KEY_TYPE_AUTO,
.insertFp = mgmtVgroupActionInsert, .insertFp = mgmtVgroupActionInsert,
.deleteFp = mgmtVgroupActionDelete, .deleteFp = mgmtVgroupActionDelete,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册