From 2b1519c167beaa5b2a098a3b01dac46060891d71 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Sep 2020 15:50:04 +0800 Subject: [PATCH] [td-1319] --- src/client/inc/tsclient.h | 20 ++-- src/client/src/tscAsync.c | 10 +- src/client/src/tscLocalMerge.c | 2 - src/client/src/tscServer.c | 45 +++++--- src/client/src/tscSql.c | 133 +++++++++++++----------- src/client/src/tscSubquery.c | 17 +-- src/client/src/tscSystem.c | 7 +- src/client/src/tscUtil.c | 58 +++++++++-- src/plugins/http/src/httpSql.c | 13 --- src/tsdb/src/tsdbRead.c | 1 + src/util/src/tcache.c | 9 +- tests/script/general/parser/groupby.sim | 2 + 12 files changed, 188 insertions(+), 129 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5f4a46ddad..11b7815586 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -29,6 +29,7 @@ extern "C" { #include "tglobal.h" #include "tsqlfunction.h" #include "tutil.h" +#include "tcache.h" #include "qExecutor.h" #include "qSqlparser.h" @@ -359,6 +360,8 @@ typedef struct SSqlObj { uint16_t numOfSubs; struct SSqlObj **pSubs; struct SSqlObj * prev, *next; + + struct SSqlObj **self; } SSqlObj; typedef struct SSqlStream { @@ -413,7 +416,6 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); -void tscDestroyResPointerInfo(SSqlRes *pRes); void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache); @@ -425,17 +427,19 @@ void tscFreeSqlResult(SSqlObj *pSql); /** * only free part of resources allocated during query. + * TODO remove it later * Note: this function is multi-thread safe. * @param pObj */ -void tscPartiallyFreeSqlObj(SSqlObj *pObj); +void tscPartiallyFreeSqlObj(SSqlObj *pSql); /** * free sql object, release allocated resource - * @param pObj Free metric/meta information, dynamically allocated payload, and - * response buffer, object itself + * @param pObj */ -void tscFreeSqlObj(SSqlObj *pObj); +void tscFreeSqlObj(SSqlObj *pSql); + +void tscFreeSqlObjInCache(void *pSql); void tscCloseTscObj(STscObj *pObj); @@ -451,9 +455,6 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); -// todo remove this function. -bool tscResultsetFetchCompleted(TAOS_RES *result); - char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); @@ -502,7 +503,8 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } } -extern void * tscCacheHandle; +extern SCacheObj* tscCacheHandle; +extern SCacheObj* tscObjCache; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d07089539a..5e9aa1b1f8 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -18,6 +18,7 @@ #include "tnote.h" #include "trpc.h" +#include "tcache.h" #include "tscLog.h" #include "tscSubquery.h" #include "tscLocalMerge.h" @@ -40,6 +41,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { + SSqlCmd* pCmd = &pSql->cmd; + pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; @@ -59,7 +62,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); - pSql->cmd.curSql = pSql->sqlstr; + pCmd->curSql = pSql->sqlstr; + + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; @@ -69,7 +75,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const tscQueueAsyncRes(pSql); return; } - + tscDoQuery(pSql); } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 39a757795e..af6a546ff4 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -472,10 +472,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { return; } - tscDebug("%p start to free local reducer", pSql); SSqlRes *pRes = &(pSql->res); if (pRes->pLocalReducer == NULL) { - tscDebug("%p local reducer has been freed, abort", pSql); return; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fbc02cc40e..0733690e3f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -27,10 +27,7 @@ #include "tutil.h" #include "tlockfree.h" -#define TSC_MGMT_VNODE 999 - SRpcCorEpSet tscMgmtEpSet; -SRpcEpSet tscDnodeEpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -236,12 +233,17 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { - SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle; - if (pSql == NULL || pSql->signature != pSql) { - tscError("%p sql is already released", pSql); + uint64_t handle = (uint64_t) rpcMsg->ahandle; + + void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t)); + if (p == NULL) { + rpcFreeCont(rpcMsg->pCont); return; } + SSqlObj* pSql = *p; + assert(pSql != NULL); + STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -249,7 +251,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); - tscFreeSqlObj(pSql); + taosCacheRelease(tscObjCache, (void**) &p, true); rpcFreeCont(rpcMsg->pCont); return; } @@ -261,18 +263,18 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - tscFreeSqlObj(pSql); + taosCacheRelease(tscObjCache, (void**) &p, true); rpcFreeCont(rpcMsg->pCont); return; } - if (pEpSet) { + if (pEpSet) { if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) { - if(pCmd->command < TSDB_SQL_MGMT) { - tscUpdateVgroupInfo(pSql, pEpSet); + if (pCmd->command < TSDB_SQL_MGMT) { + tscUpdateVgroupInfo(pSql, pEpSet); } else { tscUpdateMgmtEpSet(pEpSet); - } + } } } @@ -294,7 +296,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); } else { - // wait for a little bit moment and then retry + // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { int32_t duration = getWaitingTimeInterval(pSql->retry); taosMsleep(duration); @@ -304,6 +306,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosCacheRelease(tscObjCache, (void**) &p, false); rpcFreeCont(rpcMsg->pCont); return; } @@ -365,18 +368,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } + bool shouldFree = false; if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; - bool shouldFree = tscShouldBeFreed(pSql); + shouldFree = tscShouldBeFreed(pSql); (*pSql->fp)(pSql->param, pSql, rpcMsg->code); if (shouldFree) { + void** p1 = p; + taosCacheRelease(tscObjCache, (void **)&p1, true); tscDebug("%p sqlObj is automatically freed", pSql); - tscFreeSqlObj(pSql); } } + taosCacheRelease(tscObjCache, (void**) &p, false); rpcFreeCont(rpcMsg->pCont); } @@ -2000,7 +2006,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { createHBObj(pObj); - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); +// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; } @@ -2164,6 +2170,10 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf pNew->fp = tscTableMetaCallBack; pNew->param = pSql; + // TODO add test case on x86 platform + uint64_t adr = (uint64_t) pNew; + pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000); + int32_t code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated @@ -2265,6 +2275,9 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { } pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; + + uint64_t p = (uint64_t) pNew; + pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 9fa4db999f..33996307ad 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -100,6 +100,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } pObj->signature = pObj; + pObj->pDnodeConn = pDnodeConn; tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); @@ -132,20 +133,15 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } - pSql->pTscObj = pObj; + pSql->pTscObj = pObj; pSql->signature = pSql; - pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->fp = fp; + pSql->param = param; + pSql->cmd.command = TSDB_SQL_CONNECT; + tsem_init(&pSql->rspSem, 0, 0); - - pObj->pDnodeConn = pDnodeConn; - - pSql->fp = fp; - pSql->param = param; - if (taos != NULL) { - *taos = pObj; - } - pSql->cmd.command = TSDB_SQL_CONNECT; if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; rpcClose(pDnodeConn); @@ -154,7 +150,14 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } + if (taos != NULL) { + *taos = pObj; + } + + uint64_t key = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); + return pSql; } @@ -533,59 +536,71 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static bool tscKillQueryInVnode(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && !tscIsTwoStageSTableQuery(pQueryInfo, 0) && - (pCmd->command == TSDB_SQL_SELECT || - pCmd->command == TSDB_SQL_SHOW || - pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && - (pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { - - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]); - tscProcessSql(pSql); - return true; - } - - return false; -} +//static bool tscKillQueryInVnode(SSqlObj* pSql) { +// SSqlCmd* pCmd = &pSql->cmd; +// SSqlRes* pRes = &pSql->res; +// +// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); +// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// +// if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { +// return false; +// } +// +// if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && (pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL) && +// (pCmd->command == TSDB_SQL_SELECT || +// pCmd->command == TSDB_SQL_SHOW || +// pCmd->command == TSDB_SQL_RETRIEVE || +// pCmd->command == TSDB_SQL_FETCH)) { +// +// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; +// tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]); +// tscProcessSql(pSql); +// return true; +// } +// +// return false; +//} void taos_free_result(TAOS_RES *res) { - SSqlObj *pSql = (SSqlObj *)res; - - if (pSql == NULL || pSql->signature != pSql) { - tscDebug("%p sqlObj has been freed", pSql); - return; - } - - // The semaphore can not be changed while freeing async sub query objects. - SSqlRes *pRes = &pSql->res; - if (pRes == NULL || pRes->qhandle == 0) { - tscFreeSqlObj(pSql); - tscDebug("%p SqlObj is freed by app, qhandle is null", pSql); - return; - } - - // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (pQueryInfo == NULL) { - tscFreeSqlObj(pSql); - tscDebug("%p SqlObj is freed by app", pSql); + if (res == NULL) { return; } - pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - if (!tscKillQueryInVnode(pSql)) { - tscFreeSqlObj(pSql); - tscDebug("%p sqlObj is freed by app", pSql); - } -} + SSqlObj* pSql = (SSqlObj*) res; + taosCacheRelease(tscObjCache, (void**) &pSql->self, true); +} + +//static void doFreeResult(TAOS_RES *res) { +// SSqlObj *pSql = (SSqlObj *)res; +// +// if (pSql == NULL || pSql->signature != pSql) { +// tscDebug("%p sqlObj has been freed", pSql); +// return; +// } +// +// // The semaphore can not be changed while freeing async sub query objects. +// SSqlRes *pRes = &pSql->res; +// if (pRes == NULL || pRes->qhandle == 0) { +// tscFreeSqlObj(pSql); +// tscDebug("%p SqlObj is freed by app, qhandle is null", pSql); +// return; +// } +// +// // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node +// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); +// if (pQueryInfo == NULL) { +// tscFreeSqlObj(pSql); +// tscDebug("%p SqlObj is freed by app", pSql); +// return; +// } +// +// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; +// if (!tscKillQueryInVnode(pSql)) { +// tscFreeSqlObj(pSql); +// tscDebug("%p sqlObj is freed by app", pSql); +// } +//} int taos_errno(TAOS_RES *tres) { SSqlObj *pSql = (SSqlObj *) tres; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index c4b07f7813..e9ec272ea4 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1512,9 +1512,9 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { SSqlObj *pParentSql = trsupport->pParentSql; assert(pSql == pParentSql->pSubs[index]); - pParentSql->pSubs[index] = NULL; - - taos_free_result(pSql); +// pParentSql->pSubs[index] = NULL; +// +// taos_free_result(pSql); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } @@ -1728,10 +1728,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; -// if (pSql == NULL) { // sql object has been released in error process, return immediately -// tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx); -// return; -// } SSubqueryState* pState = trsupport->pState; assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); @@ -1907,9 +1903,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->res.code = pSql->res.code; } - taos_free_result(tres); taosTFree(pSupporter); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { return; } @@ -2029,11 +2023,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - for(int32_t j = 0; j < numOfSub; ++j) { - taosTFree(pSql->pSubs[j]->param); - taos_free_result(pSql->pSubs[j]); - } - taosTFree(pState); return TSDB_CODE_TSC_OUT_OF_MEMORY; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 72f23881d2..2c7fcf05c9 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -30,7 +30,8 @@ #include "tlocale.h" // global, not configurable -void * tscCacheHandle; +SCacheObj* tscCacheHandle; +SCacheObj* tscObjCache; void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; @@ -146,6 +147,7 @@ void taos_init_imp(void) { if (tscCacheHandle == NULL) { tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); + tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime, false, tscFreeSqlObjInCache, "sqlObjHandle"); } tscDebug("client is initialized successfully"); @@ -157,6 +159,9 @@ void taos_cleanup() { if (tscCacheHandle != NULL) { taosCacheCleanup(tscCacheHandle); tscCacheHandle = NULL; + + taosCacheCleanup(tscObjCache); + tscObjCache = NULL; } if (tscQhandle != NULL) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5d50d7791a..47abe60ddd 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -252,11 +252,11 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pRes->tsrow == NULL) { int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; pRes->numOfCols = numOfOutput; - + pRes->tsrow = calloc(numOfOutput, POINTER_BYTES); pRes->length = calloc(numOfOutput, sizeof(int32_t)); pRes->buffer = calloc(numOfOutput, POINTER_BYTES); - + // not enough memory if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { taosTFree(pRes->tsrow); @@ -268,7 +268,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -void tscDestroyResPointerInfo(SSqlRes* pRes) { +static void tscDestroyResPointerInfo(SSqlRes* pRes) { if (pRes->buffer != NULL) { // free all buffers containing the multibyte string for (int i = 0; i < pRes->numOfCols; i++) { taosTFree(pRes->buffer[i]); @@ -367,12 +367,36 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { tscResetSqlCmdObj(pCmd, false); } +static void tscFreeSubobj(SSqlObj* pSql) { + if (pSql->numOfSubs == 0) { + return; + } + + tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs); + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i); + taos_free_result(pSql->pSubs[i]); + pSql->pSubs[i] = NULL; + } + + pSql->numOfSubs = 0; +} + +void tscFreeSqlObjInCache(void *pSql) { + assert(pSql != NULL); + SSqlObj** p = (SSqlObj**) pSql; + + tscFreeSqlObj(*p); +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; } tscDebug("%p start to free sql object", pSql); + tscFreeSubobj(pSql); tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; @@ -724,13 +748,25 @@ void tscCloseTscObj(STscObj* pObj) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - pthread_mutex_destroy(&pObj->mutex); - + + // wait for all sqlObjs created according to this connect closed + while(1) { + pthread_mutex_lock(&pObj->mutex); + void* p = pObj->sqlList; + pthread_mutex_unlock(&pObj->mutex); + + if (p == NULL) { + break; + } + } + if (pObj->pDnodeConn != NULL) { rpcClose(pObj->pDnodeConn); pObj->pDnodeConn = NULL; } - + + pthread_mutex_destroy(&pObj->mutex); + tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn); taosTFree(pObj); } @@ -1721,6 +1757,9 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); + + uint64_t p = (uint64_t) pNew; + pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); return pNew; } @@ -1960,6 +1999,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } + uint64_t p = (uint64_t) pNew; + pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10); return pNew; _error: @@ -2101,11 +2142,6 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit); } -bool tscResultsetFetchCompleted(TAOS_RES *result) { - SSqlRes* pRes = result; - return pRes->completed; -} - char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } /** diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 07cdea1380..e86db2021a 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -50,10 +50,6 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n } } - // if (tscResultsetFetchCompleted(result)) { - // isContinue = false; - // } - if (isContinue) { // retrieve next batch of rows httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", @@ -223,15 +219,6 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int } } -#if 0 - // todo refactor - if (tscResultsetFetchCompleted(result)) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->ipstr, - pContext->user); - isContinue = false; - } -#endif - if (isContinue) { // retrieve next batch of rows httpDebug("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5fece58ef7..24a19e83d0 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -248,6 +248,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); + // todo apply the lastkey of table check to avoid to load header file for (int32_t i = 0; i < sizeOfGroup; ++i) { SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ab489e2e46..7fda057483 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -165,7 +165,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return NULL; } - // set free cache node callback function for hash table + // set free cache node callback function pCacheObj->freeFp = fn; pCacheObj->refreshTime = refreshTimeInSeconds * 1000; pCacheObj->extendLifespan = extendLifespan; @@ -322,7 +322,12 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { } void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { - if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) { + if (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) { + return; + } + + if (pCacheObj == NULL || (*data) == NULL) { + uError("cache:%s, NULL data to release", pCacheObj->name); return; } diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 255e00ca41..bd0d3c1a12 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -423,6 +423,8 @@ if $data97 != @group_tb0@ then return -1 endi +print ---------------------------------> group by binary|nchar data add cases + #=========================== group by multi tags ====================== sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int); -- GitLab