提交 02d15837 编写于 作者: H Haojun Liao

[td-1523] fix memory leaks and refactor some functions.

上级 2779a011
......@@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql);
*/
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
......
......@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp;
pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
......
......@@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
......@@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) {
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
int64_t ad = (int64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000);
T_REF_INC(pObj);
registerSqlObj(pSql);
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
pObj->pHb = pSql;
}
int tscProcessConnectRsp(SSqlObj *pSql) {
......@@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
registerSqlObj(pNew);
tscAddSubqueryInfo(&pNew->cmd);
......@@ -2301,10 +2293,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
}
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
registerSqlObj(pNew);
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
......
......@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj;
}
T_REF_INC(pSql->pTscObj);
uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql;
......@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) {
pHb->pRpcCtx = NULL;
}
tscDebug("%p, HB is freed", pHb);
tscDebug("%p HB is freed", pHb);
taos_free_result(pHb);
}
......
......@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
......@@ -152,9 +152,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
......@@ -122,11 +122,8 @@ void taos_init_imp(void) {
tscInitMsgsFp();
int queueSize = tsMaxConnections*2;
if (tscEmbedded == 0) {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0);
} else {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0);
}
double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) tscNumOfThreads = 2;
......@@ -140,11 +137,8 @@ void taos_init_imp(void) {
if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int64_t refreshTime = tsTableMetaKeepTimer;
refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime;
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
......
......@@ -424,12 +424,12 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql);
tscDebug("%p free sqlObj completed", p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
tscCloseTscObj(pTscObj);
}
}
......@@ -1783,6 +1783,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
void registerSqlObj(SSqlObj* pSql) {
int64_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
......@@ -1822,10 +1832,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
registerSqlObj(pNew);
return pNew;
}
......@@ -2063,10 +2070,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
}
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
registerSqlObj(pNew);
return pNew;
_error:
......
......@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册