/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" #include "textbuffer.h" #include "tscJoinProcess.h" #include "ttime.h" #include "vnode.h" #include "vnodeRead.h" #include "vnodeUtil.h" #include "vnodeQueryImpl.h" #define ALL_CACHE_BLOCKS_CHECKED(q) \ (((q)->slot == (q)->currentSlot && QUERY_IS_ASC_QUERY(q)) || ((q)->slot == (q)->firstSlot && (!QUERY_IS_ASC_QUERY(q)))) #define FORWARD_CACHE_BLOCK_CHECK_SLOT(slot, step, maxblocks) (slot) = ((slot) + (step) + (maxblocks)) % (maxblocks); static bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { return false; } for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { SColIndexEx *pColIndex = &pGroupbyExpr->columnInfo[i]; if (pColIndex->flag == TSDB_COL_TAG) { assert(pSidset->numOfSids == pSidset->numOfSubSet); return true; } } return false; } static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQuery * pQuery = &pQInfo->query; SMeterObj * pMeterObj = pMeterInfo->pMeterObj; /* no data for current query */ if ((nextKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { if (((nextKey > pSupporter->rawEKey) && QUERY_IS_ASC_QUERY(pQuery)) || ((nextKey < pSupporter->rawEKey) && (!QUERY_IS_ASC_QUERY(pQuery)))) { dTrace("QInfo:%p vid:%d sid:%d id:%s, no data qualified in block, ignore", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId); return false; } else { // in case of interval query, forward the query range setIntervalQueryRange(pMeterInfo->pMeterQInfo, pSupporter, nextKey); } } return true; } /** * The start position of the first check cache block is located before starting the loop. * And the start position for next cache blocks needs to be decided before checking each cache block. */ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, bool *firstCheckSlot) { if (!(*firstCheckSlot)) { if (QUERY_IS_ASC_QUERY(pQuery)) { pQuery->pos = 0; } else { pQuery->pos = pBlock->numOfPoints - 1; } } else { (*firstCheckSlot) = false; } } static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->pMeterQuerySupporter->runtimeEnv; SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SMeterObj *pTempMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[0]->sid); assert(pTempMeterObj != NULL); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeterObj->searchAlgorithm]; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); dTrace("QInfo:%p start to query data in cache", pQInfo); int64_t st = taosGetTimestampUs(); int32_t totalBlocks = 0; for (int32_t groupIdx = 0; groupIdx < pSupporter->pSidSet->numOfSubSet; ++groupIdx) { int32_t start = pSupporter->pSidSet->starterPos[groupIdx]; int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1; if (isQueryKilled(pQuery)) { return pMeterInfo; } for (int32_t k = start; k <= end; ++k) { SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[k]->sid); if (pMeterObj == NULL) { dError("QInfo:%p failed to find meterId:%d, continue", pQInfo, pMeterSidExtInfo[k]->sid); continue; } pQInfo->pObj = pMeterObj; pRuntimeEnv->pMeterObj = pMeterObj; if (pMeterInfo[k].pMeterQInfo == NULL) { pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey); } if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer setMeterDataInfo(&pMeterInfo[k], pMeterObj, k, groupIdx); } assert(pMeterInfo[k].meterOrderIdx == k && pMeterObj == pMeterInfo[k].pMeterObj); SMeterQueryInfo *pMeterQueryInfo = pMeterInfo[k].pMeterQInfo; restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); /* * Update the query meter column index and the corresponding filter column index * the original column index info may be inconsistent with current meter in cache. * * The stable schema has been changed, but the meter schema, along with the data in cache, * will not be updated until data with new schema arrive. */ vnodeUpdateQueryColumnIndex(pQuery, pMeterObj); vnodeUpdateFilterColumnIndex(pQuery); if (pQuery->nAggTimeInterval == 0) { if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { dTrace( "QInfo:%p vid:%d sid:%d id:%s, query completed, no need to scan data in cache. qrange:%lld-%lld, " "lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey); continue; } setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo); } else { int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { pQInfo->killed = 1; return NULL; } } qTrace("QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey); /* * find the appropriated start position in cache * NOTE: (taking ascending order query for example) * for the specific query range [pQuery->lastKey, pQuery->ekey], there may be no qualified result in cache. * Therefore, we need the first point that is greater(less) than the pQuery->lastKey, so the boundary check * should be ignored (the fourth parameter). */ TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, true); if (nextKey < 0) { qTrace("QInfo:%p vid:%d sid:%d id:%s, no data qualified in cache, cache blocks:%d, lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->numOfBlocks, pQuery->lastKey); continue; } // data in this block may be flushed to disk and this block is allocated to other meter // todo try with remain cache blocks SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); if (pBlock == NULL) { continue; } if (!doCheckWithPrevQueryRange(pQInfo, nextKey, &pMeterInfo[k])) { continue; } bool firstCheckSlot = true; SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) { pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); /* * 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next * * 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache * procedure. The block has been allocated but data has not been put into yet. If the block is the last * block(newly allocated block), abort query. Otherwise, skip it and go on. */ if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) { if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; } FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks); continue; } setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot); TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0]; // in handling file data block, the timestamp range validation is done during fetching candidate file blocks if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || (primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { break; } // only record the key on last block SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus); SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK); dTrace("QInfo:%p check data block, brange:%lld-%lld, fileId:%d, slot:%d, pos:%d, bstatus:%d", GET_QINFO_ADDR(pQuery), binfo.keyFirst, binfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos, pRuntimeEnv->blockStatus); totalBlocks++; queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL, searchFn); if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; } FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks); } } } int64_t time = taosGetTimestampUs() - st; SQueryCostSummary *pSummary = &pRuntimeEnv->summary; pSummary->blocksInCache += totalBlocks; pSummary->cacheTimeUs += time; pSummary->numOfTables = pSupporter->pSidSet->numOfSids; dTrace("QInfo:%p complete check %d cache blocks, elapsed time:%.3fms", pQInfo, totalBlocks, time / 1000.0); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); return pMeterInfo; } static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; SMeterDataBlockInfoEx *pDataBlockInfoEx = NULL; int32_t nAllocBlocksInfoSize = 0; SMeterObj * pTempMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pMeterSidExtInfo[0]->sid); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; int32_t vnodeId = pTempMeter->vnode; SQueryFilesInfo* pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles); int32_t fid = QUERY_IS_ASC_QUERY(pQuery) ? -1 : INT32_MAX; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); SQueryCostSummary *pSummary = &pRuntimeEnv->summary; int64_t totalBlocks = 0; int64_t st = taosGetTimestampUs(); while (1) { if (isQueryKilled(pQuery)) { break; } int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order); if (fileIdx < 0) { // no valid file, abort current search break; } pRuntimeEnv->startPos.fileId = fid; pQuery->fileId = fid; pSummary->numOfFiles++; char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx); if (pHeaderFileData == NULL) { // failed to mmap header file into buffer, ignore current file, try next fid += step; continue; } int32_t numOfQualifiedMeters = 0; assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet, pMeterDataInfo, &numOfQualifiedMeters); if (pReqMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters); // none of meters in query set have pHeaderFileData in this file, try next file if (numOfQualifiedMeters == 0) { fid += step; tfree(pReqMeterDataInfo); continue; } uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderFileData, numOfQualifiedMeters, pVnodeFileInfo->headerFilePath, pReqMeterDataInfo); dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters, numOfBlocks); if (numOfBlocks == 0) { fid += step; tfree(pReqMeterDataInfo); continue; } int32_t n = createDataBlocksInfoEx(pReqMeterDataInfo, numOfQualifiedMeters, &pDataBlockInfoEx, numOfBlocks, &nAllocBlocksInfoSize, (int64_t)pQInfo); if (n < 0) { // failed to create data blocks dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); tfree(pReqMeterDataInfo); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } dTrace("QInfo:%p start to load %d blocks and check", pQInfo, numOfBlocks); int64_t TRACE_OUTPUT_BLOCK_CNT = 10000; int64_t stimeUnit = 0; int64_t etimeUnit = 0; totalBlocks += numOfBlocks; // sequentially scan the pHeaderFileData file int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; for (; j < numOfBlocks && j >= 0; j += step) { if (isQueryKilled(pQuery)) { break; } /* output elapsed time for log every TRACE_OUTPUT_BLOCK_CNT blocks */ if (j == 0) { stimeUnit = taosGetTimestampMs(); } else if ((j % TRACE_OUTPUT_BLOCK_CNT) == 0) { etimeUnit = taosGetTimestampMs(); dTrace("QInfo:%p load and check %ld blocks, and continue. elapsed:%ldms", pQInfo, TRACE_OUTPUT_BLOCK_CNT, etimeUnit - stimeUnit); stimeUnit = taosGetTimestampMs(); } SMeterDataBlockInfoEx *pInfoEx = &pDataBlockInfoEx[j]; SMeterDataInfo * pOneMeterDataInfo = pInfoEx->pMeterDataInfo; SMeterQueryInfo * pMeterQueryInfo = pOneMeterDataInfo->pMeterQInfo; SMeterObj * pMeterObj = pOneMeterDataInfo->pMeterObj; pQInfo->pObj = pMeterObj; pRuntimeEnv->pMeterObj = pMeterObj; restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); if (pQuery->nAggTimeInterval == 0) { // normal query if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { qTrace( "QInfo:%p vid:%d sid:%d id:%s, query completed, no need to scan this data block. qrange:%lld-%lld, " "lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey); continue; } setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx, pMeterQueryInfo); } else { // interval query int32_t ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { tfree(pReqMeterDataInfo); // error code has been set pQInfo->killed = 1; return NULL; } } SCompBlock *pBlock = pInfoEx->pBlock.compBlock; bool ondemandLoad = onDemandLoadDatablock(pQuery, pMeterQueryInfo->queryRangeSet); int32_t ret = LoadDatablockOnDemand(pBlock, &pInfoEx->pBlock.fields, &pRuntimeEnv->blockStatus, pRuntimeEnv, fileIdx, pInfoEx->blockIndex, searchFn, ondemandLoad); if (ret != DISK_DATA_LOADED) { pSummary->skippedFileBlocks++; continue; } SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK); assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints); TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; if (IS_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus) && needPrimaryTimestampCol(pQuery, &binfo)) { TSKEY nextKey = primaryKeys[pQuery->pos]; if (!doCheckWithPrevQueryRange(pQInfo, nextKey, pOneMeterDataInfo)) { continue; } } else { // if data block is not loaded, it must be the intermediate blocks assert((pBlock->keyFirst >= pQuery->lastKey && pBlock->keyLast <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); } queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pRuntimeEnv->colDataBuffer, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields, searchFn); } tfree(pReqMeterDataInfo); // try next file fid += step; } int64_t time = taosGetTimestampUs() - st; dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pVnodeFileInfo->numOfFiles, totalBlocks, time / 1000.0); pSummary->fileTimeUs += time; pSummary->readDiskBlocks += totalBlocks; pSummary->numOfTables = pSupporter->pSidSet->numOfSids; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); freeMeterBlockInfoEx(pDataBlockInfoEx, nAllocBlocksInfoSize); return pMeterDataInfo; } static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index, int32_t start) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = &pQInfo->query; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[index]->sid); if (pMeterObj == NULL) { dError("QInfo:%p do not find required meter id: %d, all meterObjs id is:", pQInfo, pMeterSidExtInfo[index]->sid); return false; } vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pMeterSidExtInfo[index]); dTrace("QInfo:%p query on (%d): vid:%d sid:%d meterId:%s, qrange:%lld-%lld", pQInfo, index - start, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey); pQInfo->pObj = pMeterObj; pQuery->lastKey = pQuery->skey; pRuntimeEnv->pMeterObj = pMeterObj; vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache); // data in file or cache is not qualified for the query. abort if (!(dataInCache || dataInDisk)) { dTrace("QInfo:%p vid:%d sid:%d meterId:%s, qrange:%lld-%lld, nores, %p", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery); return false; } if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); // failed to find data with the specified tag value if (elem.vnode < 0) { return false; } } else { tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); } } return true; } static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; bool dataInDisk = true; bool dataInCache = true; if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, index, start)) { return 0; } #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]); } #endif SPointInterpoSupporter pointInterpSupporter = {0}; pointInterpSupporterInit(pQuery, &pointInterpSupporter); if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter)) { pointInterpSupporterDestroy(&pointInterpSupporter); return 0; } /* * here we set the value for before and after the specified time into the * parameter for interpolation query */ pointInterpSupporterSetData(pQInfo, &pointInterpSupporter); pointInterpSupporterDestroy(&pointInterpSupporter); vnodeScanAllData(pRuntimeEnv); // first/last_row query, do not invoke the finalize for super table query if (!isFirstLastRowQuery(pQuery)) { doFinalizeResult(pRuntimeEnv); } int64_t numOfRes = getNumOfResult(pRuntimeEnv); assert(numOfRes == 1 || numOfRes == 0); // accumulate the point interpolation result if (numOfRes > 0) { pQuery->pointsRead += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } return numOfRes; } static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = &pQInfo->query; tSidSet *pSids = pSupporter->pSidSet; SMeterObj *pOneMeter = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[0]->sid); resetCtxOutputBuf(pRuntimeEnv); if (isPointInterpoQuery(pQuery)) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pSupporter->subgroupIdx < pSids->numOfSubSet) { int32_t start = pSids->starterPos[pSupporter->subgroupIdx]; int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1; if (isFirstLastRowQuery(pQuery)) { dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode, pSids->numOfSubSet, pSupporter->subgroupIdx); TSKEY key = -1; int32_t index = -1; // choose the last key for one group pSupporter->meterIdx = start; for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } // get the last key of meters that belongs to this group SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[k]->sid); if (pMeterObj != NULL) { if (key < pMeterObj->lastKey) { key = pMeterObj->lastKey; index = k; } } } pQuery->skey = key; pQuery->ekey = key; pSupporter->rawSKey = key; pSupporter->rawEKey = key; int64_t num = doCheckMetersInGroup(pQInfo, index, start); assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode, pSids->numOfSubSet, pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; int64_t num = doCheckMetersInGroup(pQInfo, k, start); if (num == 1) { break; } } } pSupporter->subgroupIdx++; // output buffer is full, return to client if (pQuery->pointsRead >= pQuery->pointsToRead) { break; } } } else { // this procedure treats all tables as single group assert(pSupporter->meterIdx >= 0); /* * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ if (pSupporter->subgroupIdx > 0) { copyFromGroupBuf(pQInfo, pSupporter->pResult); pQInfo->pointsRead += pQuery->pointsRead; if (pQuery->pointsRead > 0) { return; } } if (pSupporter->meterIdx >= pSids->numOfSids) { return; } for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { SOutputRes *pOneRes = &pRuntimeEnv->pResult[i]; clearGroupResultBuf(pOneRes, pQuery->numOfOutputCols); } pRuntimeEnv->usedIndex = 0; taosCleanUpIntHash(pRuntimeEnv->hashList); int32_t primeHashSlot = 10039; pRuntimeEnv->hashList = taosInitIntHash(primeHashSlot, POINTER_BYTES, taosHashInt); while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } bool dataInDisk = true; bool dataInCache = true; if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, 0)) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; pSupporter->meterIdx++; continue; } #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]); } #endif SPointInterpoSupporter pointInterpSupporter = {0}; if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; pSupporter->meterIdx++; continue; } // TODO handle the limit problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { forwardQueryStartPosition(pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; pSupporter->meterIdx++; continue; } } vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); vnodeUpdateFilterColumnIndex(pQuery); vnodeScanAllData(pRuntimeEnv); pQuery->pointsRead = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed if (doRevisedResultsByLimit(pQInfo)) { pSupporter->meterIdx = pSupporter->pSidSet->numOfSids; break; } if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { /* * query range is identical in terms of all meters involved in query, * so we need to restore them at the *beginning* of query on each meter, * not the consecutive query on meter on which is aborted due to buffer limitation * to ensure that, we can reset the query range once query on a meter is completed. */ pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; pSupporter->meterIdx++; // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { break; } } else { // forward query range pQuery->skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter if (pQuery->pointsRead == 0) { assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); continue; } else { // buffer is full, wait for the next round to retrieve data from current meter assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); break; } } } } if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFirstLastRowQuery(pQuery)) { doFinalizeResult(pRuntimeEnv); } if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { SOutputRes *buf = &pRuntimeEnv->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes); } } pQInfo->pMeterQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult); } pQInfo->pointsRead += pQuery->pointsRead; pQuery->pointsOffset = pQuery->pointsToRead; moveDescOrderResultsToFront(pRuntimeEnv); dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%lld, offset:%lld", pQInfo, pOneMeter->vnode, pSids->numOfSids, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); } static void doOrderedScan(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQuery * pQuery = &pQInfo->query; if (QUERY_IS_ASC_QUERY(pQuery)) { pSupporter->pMeterDataInfo = queryOnMultiDataFiles(pQInfo, pSupporter->pMeterDataInfo); if (pQInfo->code != TSDB_CODE_SUCCESS) { return; } pSupporter->pMeterDataInfo = queryOnMultiDataCache(pQInfo, pSupporter->pMeterDataInfo); } else { pSupporter->pMeterDataInfo = queryOnMultiDataCache(pQInfo, pSupporter->pMeterDataInfo); if (pQInfo->code != TSDB_CODE_SUCCESS) { return; } pSupporter->pMeterDataInfo = queryOnMultiDataFiles(pQInfo, pSupporter->pMeterDataInfo); } } static void setupMeterQueryInfoForSupplementQuery(SMeterQuerySupportObj *pSupporter) { for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; changeMeterQueryInfoForSuppleQuery(pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); } } static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = &pQInfo->query; if (!needSupplementaryScan(pQuery)) { dTrace("QInfo:%p no need to do supplementary scan, query completed", pQInfo); return; } SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); disableFunctForSuppleScan(pRuntimeEnv, pQuery->order.order); if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY); setupMeterQueryInfoForSupplementQuery(pSupporter); int64_t st = taosGetTimestampMs(); doOrderedScan(pQInfo); int64_t et = taosGetTimestampMs(); dTrace("QInfo:%p supplementary scan completed, elapsed time: %lldms", pQInfo, et - st); /* * restore the env * the meter query info is not reset to the original state */ SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY); enableFunctForMasterScan(pRuntimeEnv, pQuery->order.order); if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } SET_MASTER_SCAN_FLAG(pRuntimeEnv); } static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQuery * pQuery = &pQInfo->query; if (pSupporter->subgroupIdx > 0) { /* * if the subgroupIdx > 0, the query process must be completed yet, we only need to * copy the data into output buffer */ if (pQuery->nAggTimeInterval > 0) { copyResToQueryResultBuf(pSupporter, pQuery); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); #endif } else { copyFromGroupBuf(pQInfo, pSupporter->pResult); } pQInfo->pointsRead += pQuery->pointsRead; if (pQuery->pointsRead == 0) { vnodePrintQueryStatistics(pSupporter); } dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); return; } pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters); if (pSupporter->pMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; return; } dTrace("QInfo:%p query start, qrange:%lld-%lld, order:%d, group:%d", pQInfo, pSupporter->rawSKey, pSupporter->rawEKey, pQuery->order.order, pSupporter->pSidSet->numOfSubSet); dTrace("QInfo:%p main query scan start", pQInfo); int64_t st = taosGetTimestampMs(); doOrderedScan(pQInfo); int64_t et = taosGetTimestampMs(); dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, pQuery->order.order ^ 1); // failed to save all intermediate results into disk, abort further query processing if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); return; } doMultiMeterSupplementaryScan(pQInfo); if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query killed, abort", pQInfo); return; } if (pQuery->nAggTimeInterval > 0) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pSupporter, pQuery); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); #endif } } else { // not a interval query copyFromGroupBuf(pQInfo, pSupporter->pResult); } // handle the limitation of output buffer pQInfo->pointsRead += pQuery->pointsRead; dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); } /* * in each query, this function will be called only once, no retry for further result. * * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*) from table_name group by status_column; */ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pMeterQuerySupporter->runtimeEnv; assert(pQuery->slot >= 0 && pQuery->pos >= 0); vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); if (isQueryKilled(pQuery)) { return; } // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. pQuery->pointsRead = getNumOfResult(pRuntimeEnv); assert(pQuery->pointsRead <= pQuery->pointsToRead && Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)); // must be top/bottom query if offset > 0 if (pQuery->limit.offset > 0) { assert(isTopBottomQuery(pQuery)); } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { pQInfo->pMeterQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult); } doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); moveDescOrderResultsToFront(pRuntimeEnv); pQInfo->pointsRead = pQuery->pointsRead; } static void vnodeSingleMeterMultiOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pMeterQuerySupporter->runtimeEnv; // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); } while (1) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); if (isQueryKilled(pQuery)) { return; } pQuery->pointsRead = getNumOfResult(pRuntimeEnv); if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) { doSkipResults(pRuntimeEnv); } /* * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 */ if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { break; } TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%lld, next qrange:%lld-%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey, pQuery->ekey); resetCtxOutputBuf(pRuntimeEnv); } doRevisedResultsByLimit(pQInfo); moveDescOrderResultsToFront(pRuntimeEnv); pQInfo->pointsRead += pQuery->pointsRead; if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%lld-%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey); } dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer if (!isTSCompQuery(pQuery)) { assert(pQuery->pointsRead <= pQuery->pointsToRead); } } static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { assert((pQuery->skey <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); initCtxOutputBuf(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { return; } assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_NOT_COMPLETED)); // clear tag, used to decide if the whole interval query is completed or not pQuery->over &= (~QUERY_COMPLETED); doFinalizeResult(pRuntimeEnv); int64_t maxOutput = getNumOfResult(pRuntimeEnv); // here we can ignore the records in case of no interpolation if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && pQuery->interpoType == TSDB_INTERPO_NONE) { // maxOutput <= 0, means current query does not generate any results // todo handle offset, in case of top/bottom interval query if (maxOutput > 0) { pQuery->limit.offset--; } } else { pQuery->pointsRead += maxOutput; forwardCtxOutputBuf(pRuntimeEnv, maxOutput); } if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { break; } forwardIntervalQueryRange(pSupporter, pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) { break; } /* * the scan limitation mechanism is upon here, * 1. since there is only one(k) record is generated in one scan operation * 2. remain space is not sufficient for next query output, abort */ if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); break; } } } /* handle time interval query on single table */ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) { SQuery * pQuery = &(pQInfo->query); SMeterObj *pMeterObj = pQInfo->pObj; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; int32_t numOfInterpo = 0; while (1) { resetCtxOutputBuf(pRuntimeEnv); vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv); // the offset is handled at prepare stage if no interpolation involved if (pQuery->interpoType == TSDB_INTERPO_NONE) { doRevisedResultsByLimit(pQInfo); break; } else { taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->pointsRead, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; if (QUERY_IS_ASC_QUERY(pQuery)) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes); } } else { int32_t size = pMeterObj->pointsPerFileBlock; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data + (size - pQuery->pointsRead) * pQuery->pSelectExpr[i].resBytes, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes); } } numOfInterpo = 0; pQuery->pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->pointsRead, &numOfInterpo); dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->pointsRead); if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { doRevisedResultsByLimit(pQInfo); break; } // no result generated yet, continue retrieve data pQuery->pointsRead = 0; } } pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsInterpo += numOfInterpo; moveDescOrderResultsToFront(pRuntimeEnv); dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } void vnodeSingleMeterQuery(SSchedMsg *pMsg) { SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; if (pQInfo == NULL || pQInfo->pMeterQuerySupporter == NULL) { dTrace("%p freed abort query", pQInfo); return; } if (pQInfo->killed) { dTrace("QInfo:%p it is already killed, abort", pQInfo); vnodeDecRefCount(pQInfo); return; } assert(pQInfo->refCount >= 1); SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo); SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pMeterQuerySupporter->runtimeEnv; assert(pRuntimeEnv->pMeterObj == pMeterObj); if (vnodeHasRemainResults(pQInfo)) { /* * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); pQuery->pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); doRevisedResultsByLimit(pQInfo); moveDescOrderResultsToFront(pRuntimeEnv); pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsRead += pQuery->pointsRead; dTrace( "QInfo:%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d " "totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); return; } // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { pQuery->pointsRead = 0; if (pQInfo->pMeterQuerySupporter->subgroupIdx > 0) { copyFromGroupBuf(pQInfo, pQInfo->pMeterQuerySupporter->pResult); pQInfo->pointsRead += pQuery->pointsRead; if (pQuery->pointsRead > 0) { dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); return; } } } pQInfo->over = 1; dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); return; } /* number of points returned during this query */ pQuery->pointsRead = 0; assert(pQuery->pos >= 0 && pQuery->slot >= 0); int64_t st = taosGetTimestampUs(); if (pQuery->nAggTimeInterval != 0) { // interval (down sampling operation) assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); vnodeSingleMeterIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { assert(pQuery->checkBufferInLoop == 0); vnodeSingleMeterFixedOutputProcessor(pQInfo); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBufferInLoop == 1); vnodeSingleMeterMultiOutputProcessor(pQInfo); } } // record the total elapsed time pQInfo->useconds += (taosGetTimestampUs() - st); /* check if query is killed or not */ if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed", pQInfo); pQInfo->over = 1; } else { dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); } void vnodeMultiMeterQuery(SSchedMsg *pMsg) { SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; if (pQInfo == NULL || pQInfo->pMeterQuerySupporter == NULL) { return; } if (pQInfo->killed) { vnodeDecRefCount(pQInfo); dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } assert(pQInfo->refCount >= 1); SQuery *pQuery = &pQInfo->query; pQuery->pointsRead = 0; int64_t st = taosGetTimestampUs(); if (pQuery->nAggTimeInterval > 0 || (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) { assert(pQuery->checkBufferInLoop == 0); vnodeMultiMeterQueryProcessor(pQInfo); } else { assert((pQuery->checkBufferInLoop == 1 && pQuery->nAggTimeInterval == 0) || isPointInterpoQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); vnodeMultiMeterMultiOutputProcessor(pQInfo); } /* record the total elapsed time */ pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, pQInfo->query.interpoType); SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; if (pQuery->pointsRead == 0) { pQInfo->over = 1; dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, pQInfo->pointsRead); vnodePrintQueryStatistics(pSupporter); } sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); }