From f38f691796a92204da5ea41e519bfb1d2dabe612 Mon Sep 17 00:00:00 2001 From: yifan hao Date: Wed, 6 May 2020 22:06:17 -0600 Subject: [PATCH] [vnode] Fix a few error handling as well as memory leak 1. Fix error handling in vnodeProcessQueryMsg() where qCreateQueryInfo() fails. 2. Inside qCreateQueryInfo(), cleanup pQInfo upon failure, also add a missing goto statement. --- src/query/src/queryExecutor.c | 282 +++++++++++++++++----------------- src/vnode/src/vnodeRead.c | 27 ++-- 2 files changed, 159 insertions(+), 150 deletions(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index e8abe9d819..ef0211485c 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -232,7 +232,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - + char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos; if (isNull(pElem, pFilterInfo->info.type)) { return false; @@ -241,7 +241,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) { bool qualified = false; for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) { SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; - + if (pFilterElem->fp(pFilterElem, pElem, pElem)) { qualified = true; break; @@ -824,7 +824,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 SArray *pDataBlock) { char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; - + SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; int32_t functionId = pQuery->pSelectExpr[col].base.functionId; @@ -837,18 +837,18 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 } else { pCtx->startOffset = pQuery->pos - (size - 1); } - + sas->offset = 0; sas->colList = pQuery->colList; sas->numOfCols = pQuery->numOfCols; sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); - + // here the pQuery->colList and sas->colList are identical for (int32_t i = 0; i < pQuery->numOfCols; ++i) { SColumnInfo *pColMsg = &pQuery->colList[i]; - + int32_t numOfCols = taosArrayGetSize(pDataBlock); - + dataBlock = NULL; for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor SColumnInfoData *p = taosArrayGet(pDataBlock, k); @@ -857,7 +857,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 break; } } - + assert(dataBlock != NULL); sas->data[i] = dataBlock + pCtx->startOffset * pQuery->colList[i].bytes; // start from the offset } @@ -873,7 +873,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 * column in cache with the corresponding meter schema is reinforced. */ int32_t numOfCols = taosArrayGetSize(pDataBlock); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData *p = taosArrayGet(pDataBlock, i); if (pCol->colId == p->info.colId) { @@ -912,7 +912,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); - + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].base.functionId; @@ -982,10 +982,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } - + tfree(sasArray[i].data); } - + tfree(sasArray); } @@ -1045,7 +1045,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, * column in cache with the corresponding meter schema is reinforced. */ int32_t numOfCols = taosArrayGetSize(pDataBlock); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData *p = taosArrayGet(pDataBlock, i); if (pColIndex->colId == p->info.colId) { @@ -1053,7 +1053,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, } } } - + return NULL; } @@ -1113,7 +1113,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - + SQuery *pQuery = pRuntimeEnv->pQuery; TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; @@ -1253,18 +1253,18 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } } - + pQuery->lastKey = lastKey + step; - + // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } - + tfree(sasArray[i].data); } - + free(sasArray); } @@ -1423,7 +1423,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SColIndex* pIndex = &pSqlFuncMsg->colInfo; - + int32_t index = pSqlFuncMsg->colInfo.colIndex; if (TSDB_COL_IS_TAG(pIndex->flag)) { if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { @@ -1437,7 +1437,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->inputBytes = pQuery->colList[index].bytes; pCtx->inputType = pQuery->colList[index].type; } - + pCtx->ptsOutputBuf = NULL; pCtx->outputBytes = pQuery->pSelectExpr[i].bytes; @@ -1502,7 +1502,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); - + qTrace("QInfo:%p teardown runtime env", pQInfo); cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput); @@ -1552,7 +1552,7 @@ static bool isQueryKilled(SQInfo *pQInfo) { pQInfo->killed = 1; return true; } - + return (pQInfo->killed == 1); #endif } @@ -1657,7 +1657,7 @@ static bool onlyQueryTags(SQuery* pQuery) { return false; } } - + return true; } @@ -1703,7 +1703,7 @@ static UNUSED_FUNC bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSu SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pTabObj; - + /* key in query range. If not, no qualified in disk file */ if (key != -1 && key <= pQuery->window.ekey) { if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */ @@ -2164,10 +2164,10 @@ static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void #if 0 if (pQuery->interpoType != TSDB_INTERPO_NONE) { assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery))); - + if (isIntervalQuery(pQuery)) { pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput); - + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { pQInfo->runtimeEnv.pInterpoBuf[i] = calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock); @@ -2243,14 +2243,14 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); - + if (pMeter->numOfQueries > 0) { qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid, pMeter->meterId, pMeter->numOfQueries); num++; } } - + /* * in order to reduce log output, for all meters of which numOfQueries count are 0, * we do not output corresponding information @@ -2272,26 +2272,26 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; int32_t colIndex = pFilterInfo->info.colIndex; - + // this column not valid in current data block if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) { continue; } - + // not support pre-filter operation on binary/nchar data type if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) { continue; } - + // all points in current column are NULL, no need to check its boundary value if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) { continue; } - + if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) { float minval = *(double *)(&pDataStatis[colIndex].min); float maxval = *(double *)(&pDataStatis[colIndex].max); - + for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) { if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) { return true; @@ -2306,7 +2306,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun } } } - + // todo disable this opt code block temporarily // for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { // int32_t functId = pQuery->pSelectExpr[i].base.functionId; @@ -2561,9 +2561,9 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI } else { tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val); } - + tVariantCreateFromBinary(param, val, bytes, type); - + if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { tfree(val); } @@ -2711,17 +2711,17 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf #if 0 int32_t numOfCols = pQuery->numOfOutput; printf("super table query intermediate result, total:%d\n", numOfRows); - + SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery)); SMeterObj *pMeterObj = pQInfo->pObj; - + for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { switch (pQuery->pSelectExpr[i].type) { case TSDB_DATA_TYPE_BINARY: { int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex; int32_t type = 0; - + if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) { type = pQuery->pSelectExpr[i].type; } else { @@ -3261,7 +3261,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - + memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes); pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; @@ -3847,7 +3847,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo SQuery * pQuery = pRuntimeEnv->pQuery; SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; - + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { @@ -3903,31 +3903,31 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t #if 0 SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = &pRuntimeEnv->pQuery; - + assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE); - + // build support structure for performing interpolation SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes; pSchema[i].type = pQuery->pSelectExpr[i].type; } - + // SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead); - + char * srcData[TSDB_MAX_COLUMNS] = {0}; int32_t functions[TSDB_MAX_COLUMNS] = {0}; - + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { srcData[i] = pDataSrc[i]->data; functions[i] = pQuery->pSelectExpr[i].base.functionId; } - + assert(0); // int32_t numOfRes = taosDoInterpoResult(&pRuntimeEnv->interpoInfo, pQuery->interpoType, data, numOfRows, outputRows, // pQuery->intervalTime, (int64_t *)pDataSrc[0]->data, pModel, srcData, // pQuery->defaultVal, functions, pRuntimeEnv->pTabObj->pointsPerFileBlock); - + destroyColumnModel(pModel); free(pSchema); #endif @@ -3956,20 +3956,20 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage #if 0 while (1) { numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - + TSKEY ekey = taosGetRevisedEndKey(pQuery->window.skey, pQuery->order.order, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision); int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead); - + int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows); assert(ret == numOfFinalRows); - + /* reached the start position of according to offset value, return immediately */ if (pQuery->limit.offset == 0) { return ret; } - + if (pQuery->limit.offset < ret) { ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. @@ -3984,7 +3984,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage pQuery->limit.offset -= ret; ret = 0; } - + if (!vnodeHasRemainResults(pQInfo)) { return ret; } @@ -4006,31 +4006,31 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { } else { pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf); } - + qTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0); - + qTrace("QInfo:%p statis: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField, pSummary->loadFieldUs / 1000.0); - + qTrace( "QInfo:%p statis: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes", pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0, pSummary->skippedFileBlocks, pSummary->totalGenData); - + qTrace("QInfo:%p statis: cache blocks:%d", pQInfo, pSummary->blocksInCache, 0); qTrace("QInfo:%p statis: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk); - + qTrace("QInfo:%p statis: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables); qTrace("QInfo:%p statis: seek ops:%d", pQInfo, pSummary->numOfSeek); - + double total = pSummary->fileTimeUs + pSummary->cacheTimeUs; double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs; - + // todo add the intermediate result save cost!! double computing = total - io; - + qTrace( "QInfo:%p statis: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%)," "comput:%.2fms(%.2f%)", @@ -4230,15 +4230,15 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - - + + // normal query setup the queryhandle here if (isFirstLastRowQuery(pQuery) && !isSTableQuery) { // in case of last_row query, invoke a different API. pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } - + pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; @@ -4388,11 +4388,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { if (pInfo->id.tid == blockInfo.tid) { assert(pInfo->id.uid == blockInfo.uid); pTableQueryInfo = item->info; - + break; } } - + if (pTableQueryInfo != NULL) { break; } @@ -4456,11 +4456,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pQueryHandle = NULL; } - + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); taosArrayDestroy(tx); taosArrayDestroy(g1); - + if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vgroupIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; @@ -4538,41 +4538,41 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (isFirstLastRowQuery(pQuery)) { qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex, numOfGroups); - + STsdbQueryCond cond = { .twindow = pQuery->window, .colList = pQuery->colList, .order = pQuery->order.order, .numOfCols = pQuery->numOfCols, }; - + SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayClone(group); taosArrayPush(g1, &tx); - + STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; - + // include only current table if (pRuntimeEnv->pQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pQueryHandle = NULL; } - + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp); - + initCtxOutputBuf(pRuntimeEnv); setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); scanAllDataBlocks(pRuntimeEnv); - + int64_t numOfRes = getNumOfResult(pRuntimeEnv); if (numOfRes > 0) { pQuery->rec.rows += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } - + skipResults(pRuntimeEnv); pQInfo->groupIndex += 1; - + // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); } @@ -4612,7 +4612,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex); - + STableQueryInfo *pInfo = item->info; if (pInfo->lastKey > 0) { pQuery->window.skey = pInfo->lastKey; @@ -4750,7 +4750,7 @@ static void createTableQueryInfo(SQInfo *pQInfo) { STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window); pInfo->groupIdx = i; pInfo->tableIndex = index; - + item->info = pInfo; index += 1; } @@ -5179,22 +5179,22 @@ static void stableQueryImpl(SQInfo *pQInfo) { static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { int32_t j = 0; - + if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { while(j < pQueryMsg->numOfTags) { if (pExprMsg->colInfo.colId == pTagCols[j].colId) { return j; } - + j += 1; } - + } else { while (j < pQueryMsg->numOfCols) { if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) { return j; } - + j += 1; } } @@ -5243,7 +5243,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx } } } - + return true; } @@ -5395,10 +5395,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pExprMsg = (SSqlFuncMsg *)pMsg; } - + if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { tfree(*pExpr); - + return TSDB_CODE_INVALID_QUERY_MSG; } @@ -5441,12 +5441,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, (*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags); for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) { SColumnInfo* pTagCol = (SColumnInfo*) pMsg; - + pTagCol->colId = htons(pTagCol->colId); pTagCol->bytes = htons(pTagCol->bytes); pTagCol->type = htons(pTagCol->type); pTagCol->numOfFilters = 0; - + (*tagCols)[i] = *pTagCol; pMsg += sizeof(SColumnInfo); } @@ -5458,14 +5458,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen); pMsg += pQueryMsg->tagCondLen; } - + if (*pMsg != 0) { size_t len = strlen(pMsg) + 1; *tbnameCond = malloc(len); strcpy(*tbnameCond, pMsg); pMsg += len; } - + qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, @@ -5490,7 +5490,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz); return TSDB_CODE_APP_ERROR; } - + pArithExprInfo->pExpr = pExprNode; return TSDB_CODE_SUCCESS; } @@ -5557,7 +5557,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; int16_t functId = pExprs[i].base.functionId; - + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); assert(j < pQueryMsg->numOfCols); @@ -5597,7 +5597,7 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]); } - + return pGroupbyExpr; } @@ -5620,7 +5620,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData)); pFilterInfo->info = pQuery->colList[i]; - + pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters; pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem)); @@ -5753,9 +5753,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou SColumnInfo *pColInfo = &pQuery->colList[i]; pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters); } - + pQuery->tagColList = pTagCols; - + // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { assert(pExprs[col].bytes > 0); @@ -5802,24 +5802,24 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - + pQInfo->tableIdGroupInfo = *groupInfo; size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList); - + pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; - + for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); size_t s = taosArrayGetSize(pa); - + SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); - + for(int32_t j = 0; j < s; ++j) { SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, }; taosArrayPush(p1, &item); } - + taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); } @@ -5957,7 +5957,7 @@ static void freeQInfo(SQInfo *pQInfo) { int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); for (int32_t i = 0; i < numOfGroups; ++i) { SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); - + size_t num = taosArrayGetSize(p); for(int32_t j = 0; j < num; ++j) { SGroupItem* item = taosArrayGet(p, j); @@ -5965,31 +5965,31 @@ static void freeQInfo(SQInfo *pQInfo) { destroyTableQueryInfo(item->info, pQuery->numOfOutput); } } - + taosArrayDestroy(p); } - + taosArrayDestroy(pQInfo->groupInfo.pGroupList); - + for(int32_t i = 0; i < numOfGroups; ++i) { SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i); taosArrayDestroy(p); } - + taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); - + if (pQuery->pGroupbyExpr != NULL) { taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); tfree(pQuery->pGroupbyExpr); } - + tfree(pQuery->tagColList); tfree(pQuery->pFilterInfo); tfree(pQuery->colList); tfree(pQuery->sdata); - + tfree(pQuery); - + qTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check @@ -6042,7 +6042,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); } - + // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { setQueryStatus(pQuery, QUERY_OVER); @@ -6099,10 +6099,10 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; - + if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); - + STableId *id = taosArrayGet(pTableIdList, 0); if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; @@ -6110,13 +6110,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; STableId *id = taosArrayGet(pTableIdList, 0); - + // group by normal column, do not pass the group by condition to tsdb to group table into different group int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { numOfGroupByCols = 0; } - + // todo handle the error /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, numOfGroupByCols); @@ -6131,6 +6131,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _query_over; } code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery); @@ -6139,7 +6140,12 @@ _query_over: tfree(tagCond); tfree(tbnameCond); taosArrayDestroy(pTableIdList); - + + if (code != TSDB_CODE_SUCCESS) { + tfree(*pQInfo); + *pQInfo = NULL; + } + // if failed to add ref for all meters in this query, abort current query // atomic_fetch_add_32(&vnodeSelectReqNum, 1); return code; @@ -6164,7 +6170,7 @@ void qTableQuery(qinfo_t qinfo) { } qTrace("QInfo:%p query task is launched", pQInfo); - + if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { buildTagQueryResult(pQInfo); // todo support the limit/offset } else if (pQInfo->runtimeEnv.stableQuery) { @@ -6172,7 +6178,7 @@ void qTableQuery(qinfo_t qinfo) { } else { tableQueryImpl(pQInfo); } - + sem_post(&pQInfo->dataReady); // vnodeDecRefCount(pQInfo); } @@ -6263,61 +6269,61 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList); assert(num == 1); // only one group - + SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); num = taosArrayGetSize(pa); - + assert(num == pQInfo->groupInfo.numOfTables); int16_t type, bytes; - + int32_t functionId = pQuery->pSelectExpr[0].base.functionId; if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id assert(pQuery->numOfOutput == 1); SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; - + int32_t rsize = pExprInfo->bytes; char* data = NULL; - + for(int32_t i = 0; i < num; ++i) { SGroupItem* item = taosArrayGet(pa, i); - + char* output = pQuery->sdata[0]->data + i * rsize; *(int64_t*) output = item->id.uid; // memory align problem output += sizeof(item->id.uid); - + *(int32_t*) output = item->id.tid; output += sizeof(item->id.tid); - + *(int32_t*) output = pQInfo->vgId; output += sizeof(pQInfo->vgId); - + tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data); memcpy(output, data, bytes); } - + qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num); } else { // return only the tags|table name etc. for(int32_t i = 0; i < num; ++i) { SExprInfo* pExprInfo = pQuery->pSelectExpr; SGroupItem* item = taosArrayGet(pa, i); - + char* data = NULL; for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo check the return value, refactor codes if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { tsdbGetTableName(pQInfo->tsdb, &item->id, &data); - + char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(dst, data, TSDB_TABLE_NAME_LEN); tfree(data); - + } else {// todo refactor, return the true length of binary|nchar data tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data); assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type); - + char* dst = pQuery->sdata[j]->data + i * bytes; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { memcpy(dst, data, varDataTLen(data)); @@ -6325,13 +6331,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) { memcpy(dst, data, bytes); } } - + } } - + qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num); } - + pQuery->rec.rows = num; setQueryStatus(pQuery, QUERY_COMPLETED); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index b111f56e39..283490907c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -39,11 +39,11 @@ void vnodeInitReadFp(void) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { SVnodeObj *pVnode = (SVnodeObj *)param; - if (vnodeProcessReadMsgFp[msgType] == NULL) - return TSDB_CODE_MSG_NOT_PROCESSED; + if (vnodeProcessReadMsgFp[msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; - if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_NOT_ACTIVE_VNODE; + if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_NOT_ACTIVE_VNODE; return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); } @@ -53,26 +53,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; - + qinfo_t pQInfo = NULL; if (contLen != 0) { pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); - + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->code = pRet->code; - + pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; - + dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo); } else { + assert(pCont != NULL); pQInfo = pCont; code = TSDB_CODE_ACTION_IN_PROGRESS; } - qTableQuery(pQInfo); // do execute query - + if (pQInfo != NULL) { + qTableQuery(pQInfo); // do execute query + } + return code; } @@ -84,7 +87,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c int32_t code = TSDB_CODE_SUCCESS; dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo); - + pRet->code = qRetrieveQueryResultInfo(pQInfo); if (pRet->code != TSDB_CODE_SUCCESS) { //TODO @@ -103,7 +106,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c vnodeRelease(pVnode); } } - + dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo); return code; } -- GitLab