From c2a3c50f07bf238849c6f5f26dc6e0f2f9890e0a Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 22:59:00 +0800 Subject: [PATCH] [td-32] fix memory leaks, and fix bugs in select * query. --- src/client/src/tscParseInsert.c | 11 ---- src/client/src/tscSQLParser.c | 22 ++++--- src/client/src/tscServer.c | 10 +-- src/client/src/tscSql.c | 7 +-- src/client/src/tscStream.c | 1 - src/client/src/tscUtil.c | 22 +------ src/query/src/queryExecutor.c | 105 ++++++++++++++++++-------------- src/vnode/tsdb/src/tsdbRead.c | 9 +-- 8 files changed, 83 insertions(+), 104 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ccd734ec7b..5651e5aa38 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _clean; } - // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error_clean; } - - STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; - if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - goto _error_clean; - } - - pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - - // set the next sent data vnode index in data block arraylist - pTableMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e0850a7139..6427578e82 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName)); - // for point interpolation/last_row query, we need the timestamp column to be loaded + // for all querie, the timestamp column meeds to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { - tscColumnBaseInfoInsert(pQueryInfo, &index); - } + tscColumnBaseInfoInsert(pQueryInfo, &index); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); @@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_SUM: @@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_FIRST: @@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } /* in first/last function, multiple columns can be add to resultset */ - for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) { tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]); if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) { @@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } } } - + return TSDB_CODE_SUCCESS; } else { // select * from xxx int32_t numOfFields = 0; @@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta); } + return TSDB_CODE_SUCCESS; } } @@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt default: return TSDB_CODE_INVALID_SQL; } + + } // todo refactor diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 348c0709f1..9e1efcd6ff 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { - // If it is failed, all objects allocated during execution taos_connect_a should be released - if (command == TSDB_SQL_CONNECT) { - taos_close(pObj); - tscTrace("%p Async sql close failed connection", pSql); - } else { - tscFreeSqlObj(pSql); - tscTrace("%p Async sql is automatically freed", pSql); - } + tscFreeSqlObj(pSql); + tscTrace("%p Async sql is automatically freed", pSql); } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4885cf7cc3..d62dac088b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); - // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; if (pSub == NULL) { @@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlRes *pRes = &pSql->res; if (pRes->qhandle == 0 || - pRes->completed || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; } // current data are exhausted, fetch more data - if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && + if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 46e3ac2e60..0b464c362b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlInfo SQLInfo = {0}; tSQLParse(&SQLInfo, pSql->sqlstr); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2a9673b192..d079494dde 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) { } void tscFreeSqlResult(SSqlObj* pSql) { - //TODO not free - return; - tfree(pSql->res.pRsp); pSql->res.row = 0; pSql->res.numOfRows = 0; @@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tscFreeSqlCmdData(pCmd); tscTrace("%p free sqlObj partial completed", pSql); - - tscFreeSqlCmdData(pCmd); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pCmd->allocSize = 0; - if (pSql->fp == NULL) { - tsem_destroy(&pSql->rspSem); - tsem_destroy(&pSql->emptyRspSem); - } + tsem_destroy(&pSql->rspSem); free(pSql); } @@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } int32_t command = pSql->cmd.command; - if (pTscObj->pSql == pSql) { - /* - * in case of taos_connect_a query, the object should all be released, even it is the - * master sql object. Otherwise, the master sql should not be released - */ - if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) { - return true; - } - - return false; + if (command == TSDB_SQL_CONNECT) { + return true; } if (command == TSDB_SQL_INSERT) { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index cbce5097ec..f233ac0d99 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -53,9 +53,9 @@ /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.bytes) #define GET_COLUMN_TYPE(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -1498,16 +1498,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - - if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info - SSchema *pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx); - - pCtx->inputType = pSchema->type; - pCtx->inputBytes = pSchema->bytes; - } else { - pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); - pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); - } + pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); + pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); pCtx->ptsOutputBuf = NULL; @@ -1891,8 +1883,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; } - - // pQuery->pointsOffset = pQuery->pointsToRead; } /* @@ -2552,7 +2542,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl // return DISK_DATA_LOAD_FAILED; } - if (pStatis == NULL) { + if (*pStatis == NULL) { pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); } } else { @@ -5025,11 +5015,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { } static void tableMultiOutputProcessor(SQInfo *pQInfo) { -#if 0 - SQuery * pQuery = &pQInfo->query; - SMeterObj *pMeterObj = pQInfo->pObj; - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { @@ -5044,8 +5031,8 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { return; } - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) { + pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) { doSkipResults(pRuntimeEnv); } @@ -5053,40 +5040,31 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 */ - if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); - dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey, - pQuery->ekey); + pQInfo, pQuery->limit.offset, pQuery->lastKey); resetCtxOutputBuf(pRuntimeEnv); } doRevisedResultsByLimit(pQInfo); - pQInfo->pointsRead += pQuery->pointsRead; - - if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); + pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey); + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { +// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, +// pQInfo, pQuery->lastKey, pQuery->ekey); } - dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); - - pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer - if (!isTSCompQuery(pQuery)) { - assert(pQuery->pointsRead <= pQuery->pointsToRead); - } +// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, +// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); -#endif +// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer +// if (!isTSCompQuery(pQuery)) { +// assert(pQuery->pointsRead <= pQuery->pointsToRead); +// } } static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { @@ -5127,10 +5105,8 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { } } -/* handle time interval query on single table */ +// handle time interval query on table static void tableIntervalProcessor(SQInfo *pQInfo) { - // STable *pMeterObj = pQInfo->pObj; - SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); SQuery * pQuery = pRuntimeEnv->pQuery; @@ -5839,6 +5815,39 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } +static void doUpdateExprColumnIndex(SQuery* pQuery) { + assert(pQuery->pSelectExpr != NULL && pQuery != NULL); +// int32_t i = 0, j = 0; +// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { +// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = (int16_t)j++; +// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = -1; +// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { +// j++; +// } +// } + +// while (i < pQuery->numOfCols) { +// pQuery->colList[i++].colIdx = -1; // not such column in current meter +// } + + for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; + if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { + continue; + } + + SColIndexEx* pColIndexEx = &pSqlExprMsg->colInfo; + for(int32_t f = 0; f < pQuery->numOfCols; ++f) { + if (pColIndexEx->colId == pQuery->colList[f].info.colId) { + pColIndexEx->colIdx = f; + break; + } + } + } +} + static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray *pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); @@ -5897,6 +5906,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou assert(pExprs[col].resBytes > 0); pQuery->rowSize += pExprs[col].resBytes; } + + doUpdateExprColumnIndex(pQuery); int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { @@ -5933,7 +5944,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // to make sure third party won't overwrite this structure - pQInfo->signature = (uint64_t)pQInfo; + pQInfo->signature = pQInfo; pQInfo->pTableIdList = pTableIdList; pQuery->pos = -1; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 31cdd70f36..36472857fe 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -335,7 +335,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; - // Convert row data to column data if (*skey == INT64_MIN) { *skey = dataRowKey(row); @@ -345,13 +344,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max int32_t offset = 0; for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0); + SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i); memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; } numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; }; return numOfRows; @@ -392,7 +391,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData } SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { - + // in case of data in cache, all data has been kept in column info object. + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + return pHandle->pColumns; } int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {} -- GitLab