提交 aeac5f9e 编写于 作者: S slguan

[TD-73] fix bug in refcount

上级 1d9e4a45
...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) { ...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1); atomic_add_fetch_32(pRefCount, 1);
sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
} }
} }
...@@ -444,8 +444,11 @@ void sdbDecRef(void *handle, void *pRow) { ...@@ -444,8 +444,11 @@ void sdbDecRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
if (refCount <= 0) {
int8_t* updateEnd = pRow + pTable->refCountPos - 1;
if (refCount <= 0 && *updateEnd) {
sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
SSdbOperDesc oper = {.pObj = pRow}; SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper); (*pTable->destroyFp)(&oper);
} }
...@@ -648,6 +651,8 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { ...@@ -648,6 +651,8 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
(*pTable->deleteFp)(pOper); (*pTable->deleteFp)(pOper);
int8_t* updateEnd = pOper->pObj + pTable->refCountPos - 1;
*updateEnd = 1;
sdbDecRef(pTable, pOper->pObj); sdbDecRef(pTable, pOper->pObj);
return 0; return 0;
} }
......
...@@ -53,7 +53,7 @@ static int32_t tsSuperTableUpdateSize; ...@@ -53,7 +53,7 @@ static int32_t tsSuperTableUpdateSize;
static void * mgmtGetChildTable(char *tableId); static void * mgmtGetChildTable(char *tableId);
static void * mgmtGetSuperTable(char *tableId); static void * mgmtGetSuperTable(char *tableId);
void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtGetChildTableMeta(SQueuedMsg *pMsg);
void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
...@@ -69,7 +69,7 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); ...@@ -69,7 +69,7 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg);
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb); void mgmtGetSuperTableMeta(SQueuedMsg *pMsg);
void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb); void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
...@@ -107,18 +107,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { ...@@ -107,18 +107,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId); mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId);
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
} }
mgmtDecVgroupRef(pVgroup);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) { if (pDb == NULL) {
mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT; return TSDB_CODE_INVALID_ACCT;
} }
acctDecRef(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
pTable->superTable = mgmtGetSuperTable(pTable->superTableId); pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
...@@ -129,6 +132,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { ...@@ -129,6 +132,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
} }
mgmtAddTableIntoDb(pDb); mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, pTable); mgmtAddTableIntoVgroup(pVgroup, pTable);
...@@ -145,23 +149,27 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { ...@@ -145,23 +149,27 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
if (pVgroup == NULL) { if (pVgroup == NULL) {
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
} }
mgmtDecVgroupRef(pVgroup);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) { if (pDb == NULL) {
mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT; return TSDB_CODE_INVALID_ACCT;
} }
acctDecRef(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pTable->superTable->numOfTables--; pTable->superTable->numOfTables--;
mgmtDecTableRef(pTable->superTable);
} else { } else {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
...@@ -258,6 +266,7 @@ static int32_t mgmtInitChildTables() { ...@@ -258,6 +266,7 @@ static int32_t mgmtInitChildTables() {
pNode = NULL; pNode = NULL;
while (1) { while (1) {
pLastNode = pNode; pLastNode = pNode;
mgmtDecTableRef(pTable);
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -272,6 +281,7 @@ static int32_t mgmtInitChildTables() { ...@@ -272,6 +281,7 @@ static int32_t mgmtInitChildTables() {
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
mgmtDecDbRef(pDb);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -285,6 +295,7 @@ static int32_t mgmtInitChildTables() { ...@@ -285,6 +295,7 @@ static int32_t mgmtInitChildTables() {
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
mgmtDecVgroupRef(pVgroup);
if (strcmp(pVgroup->dbName, pDb->name) != 0) { if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
...@@ -351,6 +362,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { ...@@ -351,6 +362,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
if (pDb != NULL) { if (pDb != NULL) {
mgmtAddSuperTableIntoDb(pDb); mgmtAddSuperTableIntoDb(pDb);
} }
mgmtDecDbRef(pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -362,6 +374,8 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { ...@@ -362,6 +374,8 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveSuperTableFromDb(pDb); mgmtRemoveSuperTableFromDb(pDb);
mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable);
} }
mgmtDecDbRef(pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -503,6 +517,8 @@ void mgmtIncTableRef(void *p1) { ...@@ -503,6 +517,8 @@ void mgmtIncTableRef(void *p1) {
} }
void mgmtDecTableRef(void *p1) { void mgmtDecTableRef(void *p1) {
if (p1 == NULL) return;
STableInfo *pTable = (STableInfo *)p1; STableInfo *pTable = (STableInfo *)p1;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
sdbDecRef(tsSuperTableSdb, pTable); sdbDecRef(tsSuperTableSdb, pTable);
...@@ -591,7 +607,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { ...@@ -591,7 +607,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
} }
if (pMsg->pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
mTrace("table:%s, start to drop ctable", pDrop->tableId); mTrace("table:%s, start to drop stable", pDrop->tableId);
mgmtProcessDropSuperTableMsg(pMsg); mgmtProcessDropSuperTableMsg(pMsg);
} else { } else {
mTrace("table:%s, start to drop ctable", pDrop->tableId); mTrace("table:%s, start to drop ctable", pDrop->tableId);
...@@ -656,21 +672,21 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -656,21 +672,21 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle);
SDbObj *pDb = mgmtGetDbByTableId(pInfo->tableId); pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId);
if (pDb == NULL || pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->dirty) {
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
} }
STableInfo *pTable = mgmtGetTable(pInfo->tableId); pMsg->pTable = mgmtGetTable(pInfo->tableId);
if (pTable == NULL) { if (pMsg->pTable == NULL) {
mgmtGetChildTableMeta(pMsg, NULL); mgmtGetChildTableMeta(pMsg);
} else { } else {
if (pTable->type != TSDB_SUPER_TABLE) { if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
mgmtGetChildTableMeta(pMsg, (SChildTableObj *)pTable); mgmtGetChildTableMeta(pMsg);
} else { } else {
mgmtGetSuperTableMeta(pMsg, (SSuperTableObj *)pTable, pDb); mgmtGetSuperTableMeta(pMsg);
} }
} }
} }
...@@ -1078,6 +1094,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -1078,6 +1094,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecTableRef(pTable);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
...@@ -1094,6 +1111,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -1094,6 +1111,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
SSuperTableObj *pTable = NULL; SSuperTableObj *pTable = NULL;
while (1) { while (1) {
mgmtDecTableRef(pTable);
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) { if (pTable == NULL) {
break; break;
...@@ -1111,7 +1129,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -1111,7 +1129,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
continue; continue;
} }
} }
mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables);
} }
...@@ -1128,11 +1145,12 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { ...@@ -1128,11 +1145,12 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
} }
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb) { void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
pMeta->sversion = htons(pTable->sversion); pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision; pMeta->precision = pMsg->pDb->cfg.precision;
pMeta->numOfTags = (uint8_t)pTable->numOfTags; pMeta->numOfTags = (uint8_t)pTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->tableType = pTable->info.type; pMeta->tableType = pTable->info.type;
...@@ -1148,7 +1166,6 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb ...@@ -1148,7 +1166,6 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
mgmtDecTableRef(pTable);
} }
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
...@@ -1288,6 +1305,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj ...@@ -1288,6 +1305,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
terrno = TSDB_CODE_INVALID_TABLE; terrno = TSDB_CODE_INVALID_TABLE;
return NULL; return NULL;
} }
mgmtDecTableRef(pSuperTable);
strcpy(pTable->superTableId, pSuperTable->info.tableId); strcpy(pTable->superTableId, pSuperTable->info.tableId);
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
...@@ -1355,7 +1373,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1355,7 +1373,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return; return;
} }
SVgObj *pVgroup = pMsg->pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
...@@ -1384,6 +1402,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1384,6 +1402,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable; newMsg->ahandle = pMsg->pTable;
mgmtIncTableRef(pMsg->pTable);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = newMsg, .handle = newMsg,
.pCont = pMDCreate, .pCont = pMDCreate,
...@@ -1592,7 +1611,11 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT ...@@ -1592,7 +1611,11 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT
return numOfCols * sizeof(SSchema); return numOfCols * sizeof(SSchema);
} }
static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
SDbObj *pDb = pMsg->pDb;
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
int8_t usePublicIp = pMsg->usePublicIp;
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid); pMeta->sid = htonl(pTable->sid);
pMeta->vgId = htonl(pTable->vgId); pMeta->vgId = htonl(pTable->vgId);
...@@ -1613,7 +1636,7 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab ...@@ -1613,7 +1636,7 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
} }
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId); mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId);
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
...@@ -1634,13 +1657,13 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab ...@@ -1634,13 +1657,13 @@ static int32_t mgmtDoGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STab
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
SDbObj *pDb = mgmtGetDbByTableId(pInfo->tableId); SDbObj *pDb = pMsg->pDb;
if (pDb == NULL || pDb->dirty) { if (pDb == NULL || pDb->dirty) {
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
mgmtDecTableRef(pTable);
return; return;
} }
...@@ -1677,11 +1700,10 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { ...@@ -1677,11 +1700,10 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) {
if (pMeta == NULL) { if (pMeta == NULL) {
mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId); mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
mgmtDecTableRef(pTable);
return; return;
} }
mgmtDoGetChildTableMeta(pDb, pTable, pMeta, pMsg->usePublicIp); mgmtDoGetChildTableMeta(pMsg, pMeta);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->thandle, .handle = pMsg->thandle,
...@@ -1690,7 +1712,6 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { ...@@ -1690,7 +1712,6 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) {
}; };
pMeta->contLen = htons(pMeta->contLen); pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
mgmtDecTableRef(pTable);
} }
void mgmtDropAllChildTables(SDbObj *pDropDb) { void mgmtDropAllChildTables(SDbObj *pDropDb) {
...@@ -1701,6 +1722,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { ...@@ -1701,6 +1722,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
SChildTableObj *pTable = NULL; SChildTableObj *pTable = NULL;
while (1) { while (1) {
mgmtDecTableRef(pTable);
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) { if (pTable == NULL) {
break; break;
...@@ -1729,6 +1751,7 @@ void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { ...@@ -1729,6 +1751,7 @@ void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
SChildTableObj *pTable = NULL; SChildTableObj *pTable = NULL;
while (1) { while (1) {
mgmtDecTableRef(pTable);
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) { if (pTable == NULL) {
break; break;
...@@ -1760,6 +1783,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ ...@@ -1760,6 +1783,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
SChildTableObj *pTable = pVgroup->tableList[sid]; SChildTableObj *pTable = pVgroup->tableList[sid];
mgmtIncTableRef((STableInfo *)pTable); mgmtIncTableRef((STableInfo *)pTable);
mgmtDecVgroupRef(pVgroup);
return pTable; return pTable;
} }
...@@ -1817,7 +1841,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { ...@@ -1817,7 +1841,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
return; return;
} }
SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("table:%s, failed to get vgroup", pTable->info.tableId); mError("table:%s, failed to get vgroup", pTable->info.tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
...@@ -1933,7 +1957,7 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -1933,7 +1957,7 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
} }
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen); STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen);
int32_t code = mgmtDoGetChildTableMeta(pDb, pTable, pMeta, usePublicIp); int32_t code = mgmtDoGetChildTableMeta(pMsg, pMeta);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pMultiMeta->numOfTables ++; pMultiMeta->numOfTables ++;
pMultiMeta->contLen += pMeta->contLen; pMultiMeta->contLen += pMeta->contLen;
...@@ -2017,6 +2041,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -2017,6 +2041,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
int32_t prefixLen = strlen(prefix); int32_t prefixLen = strlen(prefix);
while (numOfRows < rows) { while (numOfRows < rows) {
mgmtDecTableRef(pTable);
pShow->pNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); pShow->pNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -2062,8 +2087,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -2062,8 +2087,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecTableRef(pTable);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
......
...@@ -62,6 +62,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { ...@@ -62,6 +62,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
if (pDb == NULL) { if (pDb == NULL) {
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb);
pVgroup->pDb = pDb; pVgroup->pDb = pDb;
pVgroup->prev = NULL; pVgroup->prev = NULL;
...@@ -100,6 +101,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { ...@@ -100,6 +101,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveVgroupFromDb(pVgroup); mgmtRemoveVgroupFromDb(pVgroup);
} }
mgmtDecDbRef(pVgroup->pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -437,6 +439,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { ...@@ -437,6 +439,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
pVgroup->numOfTables++; pVgroup->numOfTables++;
} }
mgmtIncVgroupRef(pVgroup);
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup); mgmtAddVgroupIntoDbTail(pVgroup);
} }
...@@ -448,6 +451,7 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { ...@@ -448,6 +451,7 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
pVgroup->numOfTables--; pVgroup->numOfTables--;
} }
mgmtDecVgroupRef(pVgroup);
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup); mgmtAddVgroupIntoDbTail(pVgroup);
} }
...@@ -545,14 +549,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -545,14 +549,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (queueMsg->received != queueMsg->expected) return; if (queueMsg->received != queueMsg->expected) return;
if (queueMsg->received == queueMsg->successed) { if (queueMsg->received == queueMsg->successed) {
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg);
newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle;
newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
queueMsg->pCont = NULL;
mgmtAddToShellQueue(newMsg); mgmtAddToShellQueue(newMsg);
} else { } else {
SSdbOperDesc oper = { SSdbOperDesc oper = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册