diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index bc37c16187c5c6a790779ea9f25f739daa545f27..06e5ff73bf76d7a2d794fa23544a4967d90d5028 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3945,7 +3945,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { tsBufFlush(pTSbuf); strcpy(pCtx->aOutputBuf, pTSbuf->path); - tsBufDestory(pTSbuf); + tsBufDestroy(pTSbuf); doFinalizer(pCtx); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 65e2c976e05a15c01c534ab2a8756896ec380c9c..44d10ec2c41fb1840d6ee494598596cb3d568c27 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -175,7 +175,6 @@ static int32_t handlePassword(SSqlCmd* pCmd, SSQLToken* pPwd) { return TSDB_CODE_SUCCESS; } -// todo handle memory leak in error handle function int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pInfo == NULL || pSql == NULL || pSql->signature != pSql) { return TSDB_CODE_TSC_APP_ERROR; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 044a2aba9d7782ecab43964e8e52ba8adbf7fb80..6c3580bad4d1805372c752e5bd0f95b5b64cc122 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -152,8 +152,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufFlush(output1); tsBufFlush(output2); - tsBufDestory(pSupporter1->pTSBuf); - tsBufDestory(pSupporter2->pTSBuf); + tsBufDestroy(pSupporter1->pTSBuf); + tsBufDestroy(pSupporter2->pTSBuf); tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal, @@ -762,7 +762,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); - tsBufDestory(pBuf); + tsBufDestroy(pBuf); } // continue to retrieve ts-comp data from vnode diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index dea41d29321f288ee2f4ef7399d75eea0c149b61..0ea95a99796097bc83e99e232ba6ed07a50ef40e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1544,7 +1544,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { pQueryInfo->groupbyExpr.columnInfo = NULL; } - pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); + pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf); tfree(pQueryInfo->fillVal); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f55678582460af4618204cc255c8e0a121d416b6..75fe4e14e18b93cab87922ef3c0ed913871b7db1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -120,6 +120,7 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) { #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) static void setQueryStatus(SQuery *pQuery, int8_t status); +static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) @@ -838,6 +839,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); if (sas->data == NULL) { + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -900,6 +902,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (sasArray == NULL) { + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1146,6 +1149,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (sasArray == NULL) { + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1597,7 +1601,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); + pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf); } #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) @@ -2225,8 +2229,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; + if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { - return 0; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); @@ -3281,7 +3287,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { // check if query is killed or not if (IS_QUERY_KILLED(pQInfo)) { - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } } @@ -3926,7 +3933,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; while (tsdbNextDataBlock(pQueryHandle)) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); @@ -4133,7 +4141,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + pQuery->precision = tsdbGetCfg(tsdb)->precision; + pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); + pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQInfo, false); @@ -4211,10 +4222,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pQuery->fillType, pColInfo); } - // todo refactor - pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); - pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -4243,7 +4250,8 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; if (IS_QUERY_KILLED(pQInfo)) { - break; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); @@ -4527,7 +4535,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { if (IS_QUERY_KILLED(pQInfo)) { - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } pQuery->current = taosArrayGetP(group, pQInfo->tableIndex); @@ -4723,7 +4732,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { // query error occurred or query is killed, abort current execution if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // close all time window results @@ -4745,7 +4755,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { @@ -4784,7 +4795,8 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) finalizeQueryResult(pRuntimeEnv); if (IS_QUERY_KILLED(pQInfo)) { - return; + finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. @@ -4816,10 +4828,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); finalizeQueryResult(pRuntimeEnv); - if (IS_QUERY_KILLED(pQInfo)) { - return; - } - pQuery->rec.rows = getNumOfResult(pRuntimeEnv); if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) { skipResults(pRuntimeEnv); @@ -4864,10 +4872,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) while (1) { scanOneTableDataBlocks(pRuntimeEnv, start); - if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { - return; - } - assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED)); finalizeQueryResult(pRuntimeEnv); @@ -5788,7 +5792,7 @@ static bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) { +static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -5813,7 +5817,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ } pQInfo->param = param; - pQInfo->freeFn = fn; if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); @@ -5995,8 +5998,7 @@ typedef struct SQueryMgmt { pthread_mutex_t lock; } SQueryMgmt; -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, - qinfo_t* pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, qinfo_t* pQInfo) { assert(pQueryMsg != NULL && tsdb != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6086,7 +6088,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo goto _over; } - code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn); + code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param); _over: free(tagCond); @@ -6105,43 +6107,25 @@ _over: if (code != TSDB_CODE_SUCCESS) { *pQInfo = NULL; } else { - SQInfo* pq = (SQInfo*) (*pQInfo); +// SQInfo* pq = (SQInfo*) (*pQInfo); - T_REF_INC(pq); - T_REF_INC(pq); +// T_REF_INC(pq); +// T_REF_INC(pq); } // if failed to add ref for all meters in this query, abort current query return code; } -static void doDestoryQueryInfo(SQInfo* pQInfo) { - assert(pQInfo != NULL); - qDebug("QInfo:%p query completed", pQInfo); - queryCostStatis(pQInfo); // print the query cost summary - freeQInfo(pQInfo); -} - void qDestroyQueryInfo(qinfo_t qHandle) { SQInfo* pQInfo = (SQInfo*) qHandle; if (!isValidQInfo(pQInfo)) { return; } - int32_t ref = T_REF_DEC(pQInfo); - qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); - - if (ref == 0) { - _qinfo_free_fn_t freeFp = pQInfo->freeFn; - void* param = pQInfo->param; - - doDestoryQueryInfo(pQInfo); - if (freeFp != NULL) { - assert(param != NULL); - freeFp(param); - } - - } + qDebug("QInfo:%p query completed", pQInfo); + queryCostStatis(pQInfo); // print the query cost summary + freeQInfo(pQInfo); } void qTableQuery(qinfo_t qinfo) { @@ -6154,29 +6138,22 @@ void qTableQuery(qinfo_t qinfo) { if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); - sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo); return; } if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table exists for query, abort", pQInfo); - sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo); return; } - int32_t ret = setjmp(pQInfo->runtimeEnv.env); - // error occurs, record the error code and return to client + int32_t ret = setjmp(pQInfo->runtimeEnv.env); if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; - qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); + qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo); - return; } @@ -6203,7 +6180,6 @@ void qTableQuery(qinfo_t qinfo) { } sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -6247,7 +6223,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { } if (ret) { - T_REF_INC(pQInfo); +// T_REF_INC(pQInfo); qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); } @@ -6304,7 +6280,6 @@ int32_t qKillQuery(qinfo_t qinfo) { } setQueryKilled(pQInfo); - qDestroyQueryInfo(pQInfo); return TSDB_CODE_SUCCESS; } @@ -6449,6 +6424,7 @@ void freeqinfoFn(void *qhandle) { } qKillQuery(*handle); + qDestroyQueryInfo(*handle); } void* qOpenQueryMgmt(int32_t vgId) { @@ -6467,7 +6443,11 @@ void* qOpenQueryMgmt(int32_t vgId) { return pQueryHandle; } -void qSetQueryMgmtClosed(void* pQMgmt) { +static void queryMgmtKillQueryFn(void* handle) { + qKillQuery(handle); +} + +void qQueryMgmtNotifyClosed(void* pQMgmt) { if (pQMgmt == NULL) { return; } @@ -6479,7 +6459,7 @@ void qSetQueryMgmtClosed(void* pQMgmt) { pQueryMgmt->closed = true; pthread_mutex_unlock(&pQueryMgmt->lock); - taosCacheRefresh(pQueryMgmt->qinfoPool, freeqinfoFn); + taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); } void qCleanupQueryMgmt(void* pQMgmt) { diff --git a/src/query/src/qtokenizer.c b/src/query/src/qtokenizer.c index 80d59a384eb54309229aa6e6f05cb6d1301f62aa..784dd9af678f3115f98d9f18811bb18b7eb8d549 100644 --- a/src/query/src/qtokenizer.c +++ b/src/query/src/qtokenizer.c @@ -509,10 +509,11 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) { for (i = 1; isdigit(z[i]); i++) { } - /* here is the 1a/2s/3m/9y */ - if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' || - z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' || - z[i] == 'Y' || z[i] == 'W') && + /* here is the 1u/1a/2s/3m/9y */ + if ((z[i] == 'u' || z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || + z[i] == 'y' || z[i] == 'w' || + z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' || + z[i] == 'Y' || z[i] == 'W') && (isIdChar[(uint8_t)z[i + 1]] == 0)) { *tokenType = TK_VARIABLE; i += 1; diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c index fedaa315d336db33a3532a60312f9e954434185c..b84fbded38d8ada4d4f426201ea4f747a062e37e 100644 --- a/src/query/src/qtsbuf.c +++ b/src/query/src/qtsbuf.c @@ -79,7 +79,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { pTSBuf->numOfAlloc = header.numOfVnode; STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); if (tmp == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } @@ -92,7 +92,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { pTSBuf->tsOrder = header.tsOrder; if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { // tscError("invalid order info in buf:%d", pTSBuf->tsOrder); - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } @@ -100,7 +100,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); if (buf == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } @@ -120,7 +120,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { struct stat fileStat; if (fstat(fileno(pTSBuf->f), &fileStat) != 0) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } @@ -137,7 +137,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { return pTSBuf; } -void* tsBufDestory(STSBuf* pTSBuf) { +void* tsBufDestroy(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return NULL; } @@ -920,13 +920,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); if (pTSBuf->pData == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->tsData.rawBuf == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } @@ -936,13 +936,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->assistBuf == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; } pTSBuf->block.payload = malloc(MEM_BUF_SIZE); if (pTSBuf->block.payload == NULL) { - tsBufDestory(pTSBuf); + tsBufDestroy(pTSBuf); return NULL; }