提交 77704719 编写于 作者: H Haojun Liao

[td-225] add query cost statistics.

上级 a506907c
......@@ -53,7 +53,7 @@
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
typedef enum {
enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
......@@ -72,11 +72,11 @@ typedef enum {
* usually used in case of interval query with interpolation option
*/
QUERY_OVER = 0x8u,
} vnodeQueryStatus;
};
enum {
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
TS_JOIN_TAG_NOT_EQUALS = 2,
};
......@@ -1986,9 +1986,12 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
if (pQuery->numOfFilterCols > 0) {
r = BLK_DATA_ALL_NEEDED;
} else {
// check if this data block is required to load
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t colId = pQuery->pSelectExpr[i].base.colInfo.colId;
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
}
......@@ -1998,34 +2001,38 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
}
if (r == BLK_DATA_NO_NEEDED) {
qTrace("QInfo:%p data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
qTrace("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
} else if (r == BLK_DATA_FILEDS_NEEDED) {
pRuntimeEnv->summary.discardBlocks += 1;
} else if (r == BLK_DATA_STATIS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED;
}
if (*pStatis == NULL) {
pRuntimeEnv->summary.loadBlockStatis += 1;
if (*pStatis == NULL) { // data block statistics does not exist, load data block
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
}
} else {
assert(r == BLK_DATA_ALL_NEEDED);
// load the data block statistics to perform further filter
pRuntimeEnv->summary.loadBlockStatis +=1;
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED;
}
/*
* if this block is completed included in the query range, do more filter operation
* filter the data block according to the value filter condition.
* no need to load the data block, continue for next block
*/
if (!needToLoadDataBlock(pQuery, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
if (!needToLoadDataBlock(pQuery,*pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
#if defined(_DEBUG_VIEW)
qTrace("QInfo:%p block discarded by per-filter", GET_QINFO_ADDR(pRuntimeEnv));
#endif
// current block has been discard due to filter applied
pRuntimeEnv->summary.discardBlocks += 1;
// return DISK_DATA_DISCARDED;
}
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
}
......@@ -2095,6 +2102,43 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
return midPos;
}
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery;
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
int32_t remain = pRec->capacity - pRec->rows;
int32_t newSize = pRec->capacity + (pBlockInfo->rows - remain);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes;
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom
assert(0);
} else {
pQuery->sdata[i] = (tFilePage *)tmp;
}
// set the pCtx output buffer position
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
}
qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
newSize, pRec->capacity, newSize - pRec->rows);
pRec->capacity = newSize;
}
}
}
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
......@@ -2105,6 +2149,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
pRuntimeEnv->summary.dataBlocks += 1;
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
......@@ -2137,45 +2182,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < blockInfo.rows) {
int32_t remain = pRec->capacity - pRec->rows;
int32_t newSize = pRec->capacity + (blockInfo.rows - remain);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes;
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom
assert(0);
} else {
pQuery->sdata[i] = (tFilePage *)tmp;
}
// set the pCtx output buffer position
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
}
qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
newSize, pRec->capacity, newSize - pRec->rows);
pRec->capacity = newSize;
}
}
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
SDataStatis *pStatis = NULL;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
pRuntimeEnv->summary.dataInRows += blockInfo.rows;
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
......@@ -3245,14 +3260,14 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) {
void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIdx, sizeof(groupIdx));
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex));
if (pWindowRes == NULL) {
return;
}
......@@ -3503,7 +3518,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryIn
// update the number of result for each, only update the number of rows for the corresponding window result.
if (pQuery->intervalTime == 0) {
int32_t g = pTableQueryInfo->groupIdx;
int32_t g = pTableQueryInfo->groupIndex;
assert(pRuntimeEnv->windowResInfo.size > 0);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
......@@ -3649,50 +3664,49 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
}
}
void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#if 0
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
if (pRuntimeEnv->pResultBuf == NULL) {
pSummary->tmpBufferInDisk = 0;
} else {
pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf);
}
qTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0);
qTrace("QInfo:%p statis: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo,
pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField,
pSummary->loadFieldUs / 1000.0);
qTrace(
"QInfo:%p statis: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes",
pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
pSummary->skippedFileBlocks, pSummary->totalGenData);
qTrace("QInfo:%p statis: cache blocks:%d", pQInfo, pSummary->blocksInCache, 0);
qTrace("QInfo:%p statis: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
qTrace("QInfo:%p statis: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables);
qTrace("QInfo:%p statis: seek ops:%d", pQInfo, pSummary->numOfSeek);
double total = pSummary->fileTimeUs + pSummary->cacheTimeUs;
double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
// if (pRuntimeEnv->pResultBuf == NULL) {
//// pSummary->tmpBufferInDisk = 0;
// } else {
//// pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf);
// }
//
// qTrace("QInfo:%p cost: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
// pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0);
//
// qTrace("QInfo:%p cost: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo,
// pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField,
// pSummary->loadFieldUs / 1000.0);
//
// qTrace(
// "QInfo:%p cost: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes",
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData);
qTrace("QInfo:%p cost: check blocks:%d, statis:%d, rows:%"PRId64", check rows:%"PRId64, pQInfo, pSummary->dataBlocks,
pSummary->loadBlockStatis, pSummary->dataInRows, pSummary->checkRows);
// qTrace("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
//
// qTrace("QInfo:%p cost: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables);
// qTrace("QInfo:%p cost: seek ops:%d", pQInfo, pSummary->numOfSeek);
//
// double total = pSummary->fileTimeUs + pSummary->cacheTimeUs;
// double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs;
// todo add the intermediate result save cost!!
double computing = total - io;
qTrace(
"QInfo:%p statis: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%),"
"comput:%.2fms(%.2f%)",
pQInfo, total / 1000.0, pSummary->fileTimeUs / 1000.0, pSummary->fileTimeUs * 100 / total,
pSummary->cacheTimeUs / 1000.0, pSummary->cacheTimeUs * 100 / total, io / 1000.0, io * 100 / total,
computing / 1000.0, computing * 100 / total);
#endif
// double computing = total - io;
//
// qTrace(
// "QInfo:%p cost: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%),"
// "comput:%.2fms(%.2f%)",
// pQInfo, total / 1000.0, pSummary->fileTimeUs / 1000.0, pSummary->fileTimeUs * 100 / total,
// pSummary->cacheTimeUs / 1000.0, pSummary->cacheTimeUs * 100 / total, io / 1000.0, io * 100 / total,
// computing / 1000.0, computing * 100 / total);
}
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
......@@ -4106,7 +4120,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
assert(pTableQueryInfo != NULL);
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIdx, blockInfo.tid);
printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIndex, blockInfo.tid);
SDataStatis *pStatis = NULL;
......@@ -4114,7 +4128,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step);
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step);
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
......@@ -4489,7 +4503,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if (pQuery->rec.rows == 0) {
// vnodePrintQueryStatistics(pSupporter);
// queryCostStatis(pSupporter);
}
qTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total);
......@@ -4774,7 +4788,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
queryCostStatis(pQInfo);
return;
}
......@@ -4806,6 +4820,10 @@ static void tableQueryImpl(SQInfo *pQInfo) {
} else {// todo set the table uid and tid in log
qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
queryCostStatis(pQInfo);
}
}
}
......@@ -4833,7 +4851,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
if (pQuery->rec.rows == 0) {
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables,
pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
// queryCostStatis(pSupporter);
}
}
......@@ -5491,7 +5509,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
window.skey = pQueryMsg->window.skey;
}
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
item.info->groupIdx = i;
item.info->groupIndex = i;
item.info->tableIndex = tableIndex++;
taosArrayPush(p1, &item);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册