提交 6e3affb9 编写于 作者: H Haojun Liao

[td-1319]

上级 5e8ff510
......@@ -248,6 +248,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql);
if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
......@@ -263,6 +265,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
void** p1 = p;
taosCacheRelease(tscObjCache, (void**) &p1, false);
taosCacheRelease(tscObjCache, (void**) &p, true);
rpcFreeCont(rpcMsg->pCont);
return;
......@@ -368,21 +373,20 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
bool shouldFree = false;
bool shouldFree = tscShouldBeFreed(pSql);;
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
shouldFree = tscShouldBeFreed(pSql);
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
if (shouldFree) {
void** p1 = p;
taosCacheRelease(tscObjCache, (void **)&p1, true);
tscDebug("%p sqlObj is automatically freed", pSql);
}
}
void** p1 = p;
taosCacheRelease(tscObjCache, (void**) &p, false);
if (shouldFree) {
taosCacheRelease(tscObjCache, (void **)&p1, true);
tscDebug("%p sqlObj is automatically freed", pSql);
}
rpcFreeCont(rpcMsg->pCont);
}
......
......@@ -26,6 +26,7 @@
#include "tsclient.h"
#include "ttokendef.h"
#include "tutil.h"
#include "tscProfile.h"
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
......@@ -536,39 +537,53 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
//static bool tscKillQueryInVnode(SSqlObj* pSql) {
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
//
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
// return false;
// }
//
// if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && (pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL) &&
// (pCmd->command == TSDB_SQL_SELECT ||
// pCmd->command == TSDB_SQL_SHOW ||
// pCmd->command == TSDB_SQL_RETRIEVE ||
// pCmd->command == TSDB_SQL_FETCH)) {
//
// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
// tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
// tscProcessSql(pSql);
// return true;
// }
//
// return false;
//}
static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (pRes == NULL || pRes->qhandle == 0) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscRemoveFromSqlList(pSql);
int32_t cmd = pCmd->command;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) &&
(cmd == TSDB_SQL_SELECT ||
cmd == TSDB_SQL_SHOW ||
cmd == TSDB_SQL_RETRIEVE ||
cmd == TSDB_SQL_FETCH)) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s, ", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql);
return false;
}
return true;
}
void taos_free_result(TAOS_RES *res) {
if (res == NULL) {
SSqlObj* pSql = (SSqlObj*) res;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p already released sqlObj", res);
return;
}
SSqlObj* pSql = (SSqlObj*) res;
taosCacheRelease(tscObjCache, (void**) &pSql->self, true);
assert(pSql->self != 0 && *pSql->self == pSql);
bool freeNow = tscKillQueryInDnode(pSql);
if (freeNow) {
tscDebug("%p free sqlObj in cache", pSql);
taosCacheRelease(tscObjCache, (void**) &pSql->self, true);
}
}
//static void doFreeResult(TAOS_RES *res) {
......@@ -596,7 +611,7 @@ void taos_free_result(TAOS_RES *res) {
// }
//
// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
// if (!tscKillQueryInVnode(pSql)) {
// if (!tscKillQueryInDnode(pSql)) {
// tscFreeSqlObj(pSql);
// tscDebug("%p sqlObj is freed by app", pSql);
// }
......
......@@ -363,6 +363,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
taosTFree(pSql->pSubs);
pSql->numOfSubs = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
}
......@@ -390,12 +391,36 @@ void tscFreeSqlObjInCache(void *pSql) {
tscFreeSqlObj(*p);
}
static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return false;
}
int32_t cmd = pCmd->command;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) &&
(cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_SHOW || cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_FETCH)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql);
return true;
}
return false;
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
}
tscDebug("%p start to free sql object", pSql);
tscDebug("%p start to free sqlObj", pSql);
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
......@@ -412,6 +437,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tsem_destroy(&pSql->rspSem);
free(pSql);
tscDebug("%p free sqlObj completed", pSql);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
......@@ -423,7 +449,10 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
taosTFree(pDataBlock->params);
// free the refcount for metermeta
taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false);
if (pDataBlock->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false);
}
taosTFree(pDataBlock);
}
......@@ -478,7 +507,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
}
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta);
} else {
......@@ -758,6 +790,13 @@ void tscCloseTscObj(STscObj* pObj) {
if (p == NULL) {
break;
}
tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p);
taosMsleep(100);
// todo fix me!! two threads call taos_free_result will cause problem.
tscDebug("%p free :%p", pObj, p);
taos_free_result(p);
}
if (pObj->pDnodeConn != NULL) {
......@@ -1703,7 +1742,10 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
return;
}
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
}
taosTFree(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册