From d496869ee72141e2511d90df47613b10c0f7a375 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Jul 2020 18:04:52 +0800 Subject: [PATCH] [td-225] opt query perf --- src/client/src/tscAsync.c | 42 +++++++++++----------- src/client/src/tscFunctionImpl.c | 36 +++++++++++++------ src/client/src/tscUtil.c | 9 ++--- src/query/inc/tsqlfunction.h | 3 +- src/query/src/qExecutor.c | 61 +++++++++++++++++--------------- src/tsdb/src/tsdbRead.c | 10 +++++- 6 files changed, 96 insertions(+), 65 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 17998e1981..7371a8a578 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -45,6 +45,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->fp = fp; + pSql->fetchFp = fp; pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { @@ -159,7 +160,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pRes->code = numOfRows; } - tscQueueAsyncError(pSql->fetchFp, param, pRes->code); + tscQueueAsyncRes(pSql); return; } @@ -346,31 +347,32 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) { SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - SSqlCmd *pCmd = &pSql->cmd; +// SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - void *taosres = pSql; +// void *taosres = pSql; // pCmd may be released, so cache pCmd->command - int cmd = pCmd->command; - int code = pRes->code; +// int cmd = pCmd->command; +// int code = pRes->code; // in case of async insert, restore the user specified callback function - bool shouldFree = tscShouldBeFreed(pSql); - - if (cmd == TSDB_SQL_INSERT) { - assert(pSql->fp != NULL); - pSql->fp = pSql->fetchFp; - } - - if (pSql->fp) { - (*pSql->fp)(pSql->param, taosres, code); - } - - if (shouldFree) { - tscDebug("%p sqlObj is automatically freed in async res", pSql); - tscFreeSqlObj(pSql); - } +// bool shouldFree = tscShouldBeFreed(pSql); + +// if (pCmd->command == TSDB_SQL_INSERT) { +// assert(pSql->fp != NULL); + assert(pSql->fp != NULL && pSql->fetchFp != NULL); +// } + +// if (pSql->fp) { + pSql->fp = pSql->fetchFp; + (*pSql->fp)(pSql->param, pSql, pRes->code); +// } + +// if (shouldFree) { +// tscDebug("%p sqlObj is automatically freed in async res", pSql); +// tscFreeSqlObj(pSql); +// } } static void tscProcessAsyncError(SSchedMsg *pMsg) { diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 8e6878f449..1ec84f023a 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -708,6 +708,11 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en if (pCtx->order == TSDB_ORDER_DESC) { return BLK_DATA_NO_NEEDED; } + + // not initialized yet, it is the first block, load it. + if (pCtx->aOutputBuf == NULL) { + return BLK_DATA_ALL_NEEDED; + } SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { @@ -721,7 +726,12 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end if (pCtx->order != pCtx->param[0].i64Key) { return BLK_DATA_NO_NEEDED; } - + + // not initialized yet, it is the first block, load it. + if (pCtx->aOutputBuf == NULL) { + return BLK_DATA_ALL_NEEDED; + } + SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { return BLK_DATA_ALL_NEEDED; @@ -1540,6 +1550,8 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in * to decide if the value is earlier than current intermediate result */ static void first_dist_function(SQLFunctionCtx *pCtx) { + assert(pCtx->size > 0); + if (pCtx->size == 0) { return; } @@ -1554,7 +1566,12 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { } int32_t notNullElems = 0; - + + // data block is discard, not loaded, do not need to check it + if (!pCtx->preAggVals.dataBlockLoaded) { + return; + } + // find the first not null value for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_CHAR_INDEX(pCtx, i); @@ -1575,10 +1592,6 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { } static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { - if (pCtx->size == 0) { - return; - } - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; @@ -1706,10 +1719,6 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind } static void last_dist_function(SQLFunctionCtx *pCtx) { - if (pCtx->size == 0) { - return; - } - /* * 1. for scan data in asc order, no need to check data * 2. for data blocks that are not loaded, no need to check data @@ -1717,7 +1726,12 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { if (pCtx->order != pCtx->param[0].i64Key) { return; } - + + // data block is discard, not loaded, do not need to check it + if (!pCtx->preAggVals.dataBlockLoaded) { + return; + } + int32_t notNullElems = 0; for (int32_t i = pCtx->size - 1; i >= 0; --i) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c4641afbf3..24c78f2534 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1648,6 +1648,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm } pNew->fp = fp; + pNew->fetchFp = fp; pNew->param = param; pNew->maxRetry = TSDB_MAX_REPLICA_NUM; @@ -1803,6 +1804,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } pNew->fp = fp; + pNew->fetchFp = fp; + pNew->param = param; pNew->maxRetry = TSDB_MAX_REPLICA_NUM; @@ -2041,10 +2044,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { // set the callback function pSql->fp = fp; - int32_t ret = tscProcessSql(pSql); - if (ret == TSDB_CODE_SUCCESS) { - return; - } else {// todo check for failure + if (tscProcessSql(pSql) != 0) { + break; } } } diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 6a4b9874d7..e57cb26456 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -125,7 +125,8 @@ typedef struct SArithmeticSupport { } SArithmeticSupport; typedef struct SQLPreAggVal { - bool isSet; + bool isSet; // statistics info set or not + bool dataBlockLoaded; // data block is loaded or not SDataStatis statis; } SQLPreAggVal; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 841e75249f..26b8454594 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1358,6 +1358,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY pCtx->preAggVals.isSet = false; } + pCtx->preAggVals.dataBlockLoaded = (inputData != NULL); + // limit/offset query will affect this value pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos:0; pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; @@ -1928,7 +1930,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].bytes * realRowId; } -#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_DOUBLE && (_t) != TSDB_DATA_TYPE_FLOAT) +#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { @@ -1948,13 +1950,14 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat } } + // no statistics data if (index == -1) { - continue; + return true; } // not support pre-filter operation on binary/nchar data type if (!IS_PREFILTER_TYPE(pFilterInfo->info.type)) { - continue; + return true; } // all points in current column are NULL, no need to check its boundary value @@ -2203,7 +2206,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { summary->totalBlocks += 1; if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -3357,12 +3359,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols); } -#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \ - do { \ - SQuery *_query = (_runtime)->pQuery; \ - _query->current = _tableInfo; \ - assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_query)) || \ - (((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_query))); \ +#define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \ + do { \ + assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_q)) || \ + (((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_q))); \ } while (0) /** @@ -4212,6 +4212,23 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } +static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { + setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step); + } else { // interval query + TSKEY nextKey = pBlockInfo->window.skey; + setIntervalQueryRange(pQInfo, nextKey); + + if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) { + setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); + } + } +} + static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -4226,6 +4243,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; + if (IS_QUERY_KILLED(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -4236,24 +4254,16 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { break; } - assert(*pTableQueryInfo != NULL); - SET_CURRENT_QUERY_TABLE_INFO(pRuntimeEnv, *pTableQueryInfo); + pQuery->current = *pTableQueryInfo; + CHECK_QUERY_TIME_RANGE(pQuery, *pTableQueryInfo); if (!pRuntimeEnv->groupbyNormalCol) { - if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { - setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step); - } else { // interval query - TSKEY nextKey = blockInfo.window.skey; - setIntervalQueryRange(pQInfo, nextKey); - - if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) { - setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo); - } - } + setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } SDataStatis *pStatis = NULL; SArray *pDataBlock = NULL; + if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step; continue; @@ -4516,7 +4526,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { if (IS_QUERY_KILLED(pQInfo)) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5014,6 +5023,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol); sequentialTableProcess(pQInfo); + } // record the total elapsed time @@ -6112,11 +6122,6 @@ _over: //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; if (code != TSDB_CODE_SUCCESS) { *pQInfo = NULL; - } else { -// SQInfo* pq = (SQInfo*) (*pQInfo); - -// T_REF_INC(pq); -// T_REF_INC(pq); } // if failed to add ref for all meters in this query, abort current query diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c055a27c39..3311a0f13c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1801,7 +1801,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta } tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); - + + // todo opt perf size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); for(int32_t i = 0; i < numOfCols; ++i) { SDataStatis* st = &pHandle->statis[i]; @@ -1820,6 +1821,13 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } + + // todo opt perf + SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i); + if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) { + pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst; + pHandle->statis[i].max = pBlockInfo->compBlock->keyLast; + } } return TSDB_CODE_SUCCESS; -- GitLab