From aeac5f9ea8891361daa5146e47936c0edfaa7fdf Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 4 Apr 2020 12:12:06 +0800 Subject: [PATCH] [TD-73] fix bug in refcount --- src/mnode/src/mgmtSdb.c | 11 ++++-- src/mnode/src/mgmtTable.c | 81 ++++++++++++++++++++++++-------------- src/mnode/src/mgmtVgroup.c | 13 +++--- 3 files changed, 65 insertions(+), 40 deletions(-) diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index d98ed623a4..ecace420ef 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); atomic_add_fetch_32(pRefCount, 1); - sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); + sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } } @@ -444,8 +444,11 @@ void sdbDecRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); - if (refCount <= 0) { + sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); + + int8_t* updateEnd = pRow + pTable->refCountPos - 1; + if (refCount <= 0 && *updateEnd) { + sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); SSdbOperDesc oper = {.pObj = pRow}; (*pTable->destroyFp)(&oper); } @@ -648,6 +651,8 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { pthread_mutex_unlock(&pTable->mutex); (*pTable->deleteFp)(pOper); + int8_t* updateEnd = pOper->pObj + pTable->refCountPos - 1; + *updateEnd = 1; sdbDecRef(pTable, pOper->pObj); return 0; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index e2ed023fd2..08eba1434d 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -53,7 +53,7 @@ static int32_t tsSuperTableUpdateSize; static void * mgmtGetChildTable(char *tableId); static void * mgmtGetSuperTable(char *tableId); -void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable); +void mgmtGetChildTableMeta(SQueuedMsg *pMsg); void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); @@ -69,7 +69,7 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb); +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); void mgmtDropAllSuperTables(SDbObj *pDropDb); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); @@ -107,18 +107,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId); return TSDB_CODE_INVALID_VGROUP_ID; } + mgmtDecVgroupRef(pVgroup); SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } + mgmtDecDbRef(pDb); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } + acctDecRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { pTable->superTable = mgmtGetSuperTable(pTable->superTableId); @@ -129,6 +132,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); } + mgmtAddTableIntoDb(pDb); mgmtAddTableIntoVgroup(pVgroup, pTable); @@ -145,23 +149,27 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { if (pVgroup == NULL) { return TSDB_CODE_INVALID_VGROUP_ID; } + mgmtDecVgroupRef(pVgroup); SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } + mgmtDecDbRef(pDb); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } + acctDecRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); pTable->superTable->numOfTables--; + mgmtDecTableRef(pTable->superTable); } else { grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); @@ -258,6 +266,7 @@ static int32_t mgmtInitChildTables() { pNode = NULL; while (1) { pLastNode = pNode; + mgmtDecTableRef(pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); if (pTable == NULL) break; @@ -272,6 +281,7 @@ static int32_t mgmtInitChildTables() { pNode = pLastNode; continue; } + mgmtDecDbRef(pDb); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -285,6 +295,7 @@ static int32_t mgmtInitChildTables() { pNode = pLastNode; continue; } + mgmtDecVgroupRef(pVgroup); if (strcmp(pVgroup->dbName, pDb->name) != 0) { mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", @@ -351,6 +362,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { if (pDb != NULL) { mgmtAddSuperTableIntoDb(pDb); } + mgmtDecDbRef(pDb); return TSDB_CODE_SUCCESS; } @@ -362,6 +374,8 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { mgmtRemoveSuperTableFromDb(pDb); mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); } + mgmtDecDbRef(pDb); + return TSDB_CODE_SUCCESS; } @@ -503,6 +517,8 @@ void mgmtIncTableRef(void *p1) { } void mgmtDecTableRef(void *p1) { + if (p1 == NULL) return; + STableInfo *pTable = (STableInfo *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbDecRef(tsSuperTableSdb, pTable); @@ -591,7 +607,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - mTrace("table:%s, start to drop ctable", pDrop->tableId); + mTrace("table:%s, start to drop stable", pDrop->tableId); mgmtProcessDropSuperTableMsg(pMsg); } else { mTrace("table:%s, start to drop ctable", pDrop->tableId); @@ -656,21 +672,21 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); - SDbObj *pDb = mgmtGetDbByTableId(pInfo->tableId); - if (pDb == NULL || pDb->dirty) { + pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId); + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - STableInfo *pTable = mgmtGetTable(pInfo->tableId); - if (pTable == NULL) { - mgmtGetChildTableMeta(pMsg, NULL); + pMsg->pTable = mgmtGetTable(pInfo->tableId); + if (pMsg->pTable == NULL) { + mgmtGetChildTableMeta(pMsg); } else { - if (pTable->type != TSDB_SUPER_TABLE) { - mgmtGetChildTableMeta(pMsg, (SChildTableObj *)pTable); + if (pMsg->pTable->type != TSDB_SUPER_TABLE) { + mgmtGetChildTableMeta(pMsg); } else { - mgmtGetSuperTableMeta(pMsg, (SSuperTableObj *)pTable, pDb); + mgmtGetSuperTableMeta(pMsg); } } } @@ -1078,6 +1094,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v cols++; numOfRows++; + mgmtDecTableRef(pTable); } pShow->numOfReads += numOfRows; @@ -1094,6 +1111,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { SSuperTableObj *pTable = NULL; while (1) { + mgmtDecTableRef(pTable); pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); if (pTable == NULL) { break; @@ -1111,7 +1129,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { continue; } } - mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); } @@ -1128,11 +1145,12 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); } -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb) { - STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); pMeta->uid = htobe64(pTable->uid); pMeta->sversion = htons(pTable->sversion); - pMeta->precision = pDb->cfg.precision; + pMeta->precision = pMsg->pDb->cfg.precision; pMeta->numOfTags = (uint8_t)pTable->numOfTags; pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->tableType = pTable->info.type; @@ -1148,7 +1166,6 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb rpcSendResponse(&rpcRsp); mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); - mgmtDecTableRef(pTable); } static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { @@ -1288,6 +1305,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj terrno = TSDB_CODE_INVALID_TABLE; return NULL; } + mgmtDecTableRef(pSuperTable); strcpy(pTable->superTableId, pSuperTable->info.tableId); pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + @@ -1355,7 +1373,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - SVgObj *pVgroup = pMsg->pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); + SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); if (pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); @@ -1384,6 +1402,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; + mgmtIncTableRef(pMsg->pTable); SRpcMsg rpcMsg = { .handle = newMsg, .pCont = pMDCreate, @@ -1592,7 +1611,11 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT return numOfCols * sizeof(SSchema); } -static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { +static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { + SDbObj *pDb = pMsg->pDb; + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + int8_t usePublicIp = pMsg->usePublicIp; + pMeta->uid = htobe64(pTable->uid); pMeta->sid = htonl(pTable->sid); pMeta->vgId = htonl(pTable->vgId); @@ -1613,7 +1636,7 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); } - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId); return TSDB_CODE_INVALID_VGROUP_ID; @@ -1634,13 +1657,13 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab return TSDB_CODE_SUCCESS; } -void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { +void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SCMTableInfoMsg *pInfo = pMsg->pCont; - SDbObj *pDb = mgmtGetDbByTableId(pInfo->tableId); + SDbObj *pDb = pMsg->pDb; if (pDb == NULL || pDb->dirty) { mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); - mgmtDecTableRef(pTable); return; } @@ -1677,11 +1700,10 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { if (pMeta == NULL) { mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); - mgmtDecTableRef(pTable); return; } - mgmtDoGetChildTableMeta(pDb, pTable, pMeta, pMsg->usePublicIp); + mgmtDoGetChildTableMeta(pMsg, pMeta); SRpcMsg rpcRsp = { .handle = pMsg->thandle, @@ -1690,7 +1712,6 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { }; pMeta->contLen = htons(pMeta->contLen); rpcSendResponse(&rpcRsp); - mgmtDecTableRef(pTable); } void mgmtDropAllChildTables(SDbObj *pDropDb) { @@ -1701,6 +1722,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { SChildTableObj *pTable = NULL; while (1) { + mgmtDecTableRef(pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); if (pTable == NULL) { break; @@ -1729,6 +1751,7 @@ void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { SChildTableObj *pTable = NULL; while (1) { + mgmtDecTableRef(pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); if (pTable == NULL) { break; @@ -1760,6 +1783,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ SChildTableObj *pTable = pVgroup->tableList[sid]; mgmtIncTableRef((STableInfo *)pTable); + mgmtDecVgroupRef(pVgroup); return pTable; } @@ -1817,7 +1841,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { return; } - SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + SVgObj *pVgroup = queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to get vgroup", pTable->info.tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); @@ -1933,7 +1957,7 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { } STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen); - int32_t code = mgmtDoGetChildTableMeta(pDb, pTable, pMeta, usePublicIp); + int32_t code = mgmtDoGetChildTableMeta(pMsg, pMeta); if (code == TSDB_CODE_SUCCESS) { pMultiMeta->numOfTables ++; pMultiMeta->contLen += pMeta->contLen; @@ -2017,6 +2041,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, int32_t prefixLen = strlen(prefix); while (numOfRows < rows) { + mgmtDecTableRef(pTable); pShow->pNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); if (pTable == NULL) break; @@ -2062,8 +2087,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, cols++; numOfRows++; - - mgmtDecTableRef(pTable); } pShow->numOfReads += numOfRows; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index fbaa0a376b..23840c0b1d 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -62,6 +62,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { if (pDb == NULL) { return TSDB_CODE_INVALID_DB; } + mgmtDecDbRef(pDb); pVgroup->pDb = pDb; pVgroup->prev = NULL; @@ -100,6 +101,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { mgmtRemoveVgroupFromDb(pVgroup); } + mgmtDecDbRef(pVgroup->pDb); return TSDB_CODE_SUCCESS; } @@ -437,6 +439,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { pVgroup->numOfTables++; } + mgmtIncVgroupRef(pVgroup); if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) mgmtAddVgroupIntoDbTail(pVgroup); } @@ -448,6 +451,7 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { pVgroup->numOfTables--; } + mgmtDecVgroupRef(pVgroup); if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) mgmtAddVgroupIntoDbTail(pVgroup); } @@ -545,14 +549,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (queueMsg->received != queueMsg->expected) return; if (queueMsg->received == queueMsg->successed) { - SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); - newMsg->msgType = queueMsg->msgType; - newMsg->thandle = queueMsg->thandle; - newMsg->pUser = queueMsg->pUser; - newMsg->contLen = queueMsg->contLen; - newMsg->pCont = rpcMallocCont(newMsg->contLen); - memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); - queueMsg->pCont = NULL; + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); mgmtAddToShellQueue(newMsg); } else { SSdbOperDesc oper = { -- GitLab