From 77704719624f187511a8966e916e79a0d0d06134 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 31 May 2020 00:05:41 +0800 Subject: [PATCH] [td-225] add query cost statistics. --- src/query/src/qExecutor.c | 220 +++++++++++++++++++++----------------- 1 file changed, 119 insertions(+), 101 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cc7eb6e8e6..b487c790a9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -53,7 +53,7 @@ ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes) #define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type) -typedef enum { +enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -72,11 +72,11 @@ typedef enum { * usually used in case of interval query with interpolation option */ QUERY_OVER = 0x8u, -} vnodeQueryStatus; +}; enum { - TS_JOIN_TS_EQUAL = 0, - TS_JOIN_TS_NOT_EQUALS = 1, + TS_JOIN_TS_EQUAL = 0, + TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TAG_NOT_EQUALS = 2, }; @@ -1986,9 +1986,12 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, if (pQuery->numOfFilterCols > 0) { r = BLK_DATA_ALL_NEEDED; } else { + // check if this data block is required to load for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; - int32_t colId = pQuery->pSelectExpr[i].base.colInfo.colId; + SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; + + int32_t functionId = pSqlFunc->functionId; + int32_t colId = pSqlFunc->colInfo.colId; r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); } @@ -1998,34 +2001,38 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, } if (r == BLK_DATA_NO_NEEDED) { - qTrace("QInfo:%p data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), + qTrace("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - } else if (r == BLK_DATA_FILEDS_NEEDED) { + pRuntimeEnv->summary.discardBlocks += 1; + } else if (r == BLK_DATA_STATIS_NEEDED) { if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { // return DISK_DATA_LOAD_FAILED; } - - if (*pStatis == NULL) { + + pRuntimeEnv->summary.loadBlockStatis += 1; + + if (*pStatis == NULL) { // data block statistics does not exist, load data block pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + pRuntimeEnv->summary.checkRows += pBlockInfo->rows; } } else { assert(r == BLK_DATA_ALL_NEEDED); + + // load the data block statistics to perform further filter + pRuntimeEnv->summary.loadBlockStatis +=1; if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { - // return DISK_DATA_LOAD_FAILED; } - - /* - * if this block is completed included in the query range, do more filter operation - * filter the data block according to the value filter condition. - * no need to load the data block, continue for next block - */ - if (!needToLoadDataBlock(pQuery, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { + + if (!needToLoadDataBlock(pQuery,*pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { #if defined(_DEBUG_VIEW) qTrace("QInfo:%p block discarded by per-filter", GET_QINFO_ADDR(pRuntimeEnv)); #endif + // current block has been discard due to filter applied + pRuntimeEnv->summary.discardBlocks += 1; // return DISK_DATA_DISCARDED; } - + + pRuntimeEnv->summary.checkRows += pBlockInfo->rows; pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); } @@ -2095,6 +2102,43 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { return midPos; } +static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { + // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block + SQuery* pQuery = pRuntimeEnv->pQuery; + if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { + SResultRec *pRec = &pQuery->rec; + + if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { + int32_t remain = pRec->capacity - pRec->rows; + int32_t newSize = pRec->capacity + (pBlockInfo->rows - remain); + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t bytes = pQuery->pSelectExpr[i].bytes; + + char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); + if (tmp == NULL) { // todo handle the oom + assert(0); + } else { + pQuery->sdata[i] = (tFilePage *)tmp; + } + + // set the pCtx output buffer position + pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes; + + int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; + } + } + + qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv), + newSize, pRec->capacity, newSize - pRec->rows); + + pRec->capacity = newSize; + } + } +} + static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; @@ -2105,6 +2149,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { + pRuntimeEnv->summary.dataBlocks += 1; if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return 0; } @@ -2137,45 +2182,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block - if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { - SResultRec *pRec = &pQuery->rec; - - if (pQuery->rec.capacity - pQuery->rec.rows < blockInfo.rows) { - int32_t remain = pRec->capacity - pRec->rows; - int32_t newSize = pRec->capacity + (blockInfo.rows - remain); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t bytes = pQuery->pSelectExpr[i].bytes; - - char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); - if (tmp == NULL) { // todo handle the oom - assert(0); - } else { - pQuery->sdata[i] = (tFilePage *)tmp; - } - - // set the pCtx output buffer position - pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes; - - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; - } - } - - qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv), - newSize, pRec->capacity, newSize - pRec->rows); - - pRec->capacity = newSize; - } - } + ensureOutputBuffer(pRuntimeEnv, &blockInfo); SDataStatis *pStatis = NULL; - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); + pRuntimeEnv->summary.dataInRows += blockInfo.rows; qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); @@ -3245,14 +3260,14 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) { +void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIndex, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; int32_t GROUPRESULTID = 1; - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIdx, sizeof(groupIdx)); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex)); if (pWindowRes == NULL) { return; } @@ -3503,7 +3518,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryIn // update the number of result for each, only update the number of rows for the corresponding window result. if (pQuery->intervalTime == 0) { - int32_t g = pTableQueryInfo->groupIdx; + int32_t g = pTableQueryInfo->groupIndex; assert(pRuntimeEnv->windowResInfo.size > 0); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g)); @@ -3649,50 +3664,49 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int } } -void vnodePrintQueryStatistics(SQInfo *pQInfo) { -#if 0 +void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +// SQuery *pQuery = pRuntimeEnv->pQuery; - SQuery *pQuery = pRuntimeEnv->pQuery; - - SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - if (pRuntimeEnv->pResultBuf == NULL) { - pSummary->tmpBufferInDisk = 0; - } else { - pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf); - } - - qTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, - pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0); - - qTrace("QInfo:%p statis: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo, - pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField, - pSummary->loadFieldUs / 1000.0); - - qTrace( - "QInfo:%p statis: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes", - pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0, - pSummary->skippedFileBlocks, pSummary->totalGenData); - - qTrace("QInfo:%p statis: cache blocks:%d", pQInfo, pSummary->blocksInCache, 0); - qTrace("QInfo:%p statis: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk); - - qTrace("QInfo:%p statis: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables); - qTrace("QInfo:%p statis: seek ops:%d", pQInfo, pSummary->numOfSeek); - - double total = pSummary->fileTimeUs + pSummary->cacheTimeUs; - double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs; + SQueryCostInfo *pSummary = &pRuntimeEnv->summary; +// if (pRuntimeEnv->pResultBuf == NULL) { +//// pSummary->tmpBufferInDisk = 0; +// } else { +//// pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf); +// } +// +// qTrace("QInfo:%p cost: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, +// pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0); +// +// qTrace("QInfo:%p cost: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo, +// pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField, +// pSummary->loadFieldUs / 1000.0); +// +// qTrace( +// "QInfo:%p cost: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes", +// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0, +// pSummary->skippedFileBlocks, pSummary->totalGenData); + + qTrace("QInfo:%p cost: check blocks:%d, statis:%d, rows:%"PRId64", check rows:%"PRId64, pQInfo, pSummary->dataBlocks, + pSummary->loadBlockStatis, pSummary->dataInRows, pSummary->checkRows); + +// qTrace("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk); +// +// qTrace("QInfo:%p cost: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables); +// qTrace("QInfo:%p cost: seek ops:%d", pQInfo, pSummary->numOfSeek); +// +// double total = pSummary->fileTimeUs + pSummary->cacheTimeUs; +// double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs; // todo add the intermediate result save cost!! - double computing = total - io; - - qTrace( - "QInfo:%p statis: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%)," - "comput:%.2fms(%.2f%)", - pQInfo, total / 1000.0, pSummary->fileTimeUs / 1000.0, pSummary->fileTimeUs * 100 / total, - pSummary->cacheTimeUs / 1000.0, pSummary->cacheTimeUs * 100 / total, io / 1000.0, io * 100 / total, - computing / 1000.0, computing * 100 / total); -#endif +// double computing = total - io; +// +// qTrace( +// "QInfo:%p cost: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%)," +// "comput:%.2fms(%.2f%)", +// pQInfo, total / 1000.0, pSummary->fileTimeUs / 1000.0, pSummary->fileTimeUs * 100 / total, +// pSummary->cacheTimeUs / 1000.0, pSummary->cacheTimeUs * 100 / total, io / 1000.0, io * 100 / total, +// computing / 1000.0, computing * 100 / total); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { @@ -4106,7 +4120,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { assert(pTableQueryInfo != NULL); restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo); - printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIdx, blockInfo.tid); + printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIndex, blockInfo.tid); SDataStatis *pStatis = NULL; @@ -4114,7 +4128,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { if (!isIntervalQuery(pQuery)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; - setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step); + setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); } else { // interval query TSKEY nextKey = blockInfo.window.skey; setIntervalQueryRange(pQInfo, nextKey); @@ -4489,7 +4503,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (pQuery->rec.rows == 0) { - // vnodePrintQueryStatistics(pSupporter); + // queryCostStatis(pSupporter); } qTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -4774,7 +4788,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { } qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total); - // vnodePrintQueryStatistics(pSupporter); + queryCostStatis(pQInfo); return; } @@ -4806,6 +4820,10 @@ static void tableQueryImpl(SQInfo *pQInfo) { } else {// todo set the table uid and tid in log qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + queryCostStatis(pQInfo); + } } } @@ -4833,7 +4851,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { if (pQuery->rec.rows == 0) { qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total); - // vnodePrintQueryStatistics(pSupporter); + // queryCostStatis(pSupporter); } } @@ -5491,7 +5509,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, window.skey = pQueryMsg->window.skey; } item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window); - item.info->groupIdx = i; + item.info->groupIndex = i; item.info->tableIndex = tableIndex++; taosArrayPush(p1, &item); } -- GitLab