diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ec1261da0a45ad24985ebf51b9b16c2acfad7709..61456664985e3007c7be3be5c388e1a9728123af 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -305,6 +305,36 @@ typedef struct SQueryParam { SSqlGroupbyExpr *pGroupbyExpr; } SQueryParam; +typedef struct SSDataBlock { + SDataStatis *pBlockStatis; + SArray *pDataBlock; + SDataBlockInfo info; +} SSDataBlock; + +typedef struct STableScanInfo { + void *pQueryHandle; + int32_t numOfBlocks; + int32_t numOfSkipped; + int32_t numOfBlockStatis; + + int64_t numOfRows; + int32_t order; + bool completed; + + SSDataBlock block; + + int64_t elapsedTime; + SSDataBlock* (*apply)(void* param); +} STableScanInfo; + +typedef struct SAggOperatorInfo { + SResultRowInfo *pResultRowInfo; + STableQueryInfo *pTableQueryInfo; + STableScanInfo *pTableScanInfo; + SQueryRuntimeEnv *pRuntimeEnv; + SSDataBlock* (*apply)(void* param); +} SAggOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e1f690407bf7fc7892ef45439cf928b1be1b04f2..e4ee3221c9b9fdd41e1f904c8708a3ce8dd9ce94 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3886,45 +3886,43 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { */ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - STableQueryInfo *pTableQueryInfo = pQuery->current; + SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo *pTableQueryInfo = pQuery->current; + SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo; - if (pTableQueryInfo->queryRangeSet) { - pTableQueryInfo->lastKey = key; - } else { - pTableQueryInfo->win.skey = key; - STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey}; + if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) { + return; + } - // for too small query range, no data in this interval. - if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) || - (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey < pQuery->window.ekey))) { - return; - } + pTableQueryInfo->win.skey = key; + STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey}; - /** - * In handling the both ascending and descending order super table query, we need to find the first qualified - * timestamp of this table, and then set the first qualified start timestamp. - * In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional - * operations involve. - */ - STimeWindow w = TSWINDOW_INITIALIZER; - SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo; + // for too small query range, no data in this interval. + if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) || + (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey < pQuery->window.ekey))) { + return; + } - TSKEY sk = MIN(win.skey, win.ekey); - TSKEY ek = MAX(win.skey, win.ekey); - getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w); + /** + * In handling the both ascending and descending order super table query, we need to find the first qualified + * timestamp of this table, and then set the first qualified start timestamp. + * In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional + * operations involve. + */ + STimeWindow w = TSWINDOW_INITIALIZER; - if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { - if (!QUERY_IS_ASC_QUERY(pQuery)) { - assert(win.ekey == pQuery->window.ekey); - } + TSKEY sk = MIN(win.skey, win.ekey); + TSKEY ek = MAX(win.skey, win.ekey); + getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w); - pWindowResInfo->prevSKey = w.skey; + if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { + if (!QUERY_IS_ASC_QUERY(pQuery)) { + assert(win.ekey == pQuery->window.ekey); } - - pTableQueryInfo->queryRangeSet = 1; - pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; + pWindowResInfo->prevSKey = w.skey; } + + pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; } bool requireTimestamp(SQuery *pQuery) { @@ -4276,6 +4274,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); } + static void freeTableBlockDist(STableBlockDist *pTableBlockDist) { if (pTableBlockDist != NULL) { taosArrayDestroy(pTableBlockDist->dataBlockInfos); @@ -4283,6 +4282,7 @@ static void freeTableBlockDist(STableBlockDist *pTableBlockDist) { free(pTableBlockDist); } } + static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) { int32_t len = (int32_t)taosArrayGetSize(pArray); if (len <= 0) { @@ -4642,7 +4642,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); - pRuntimeEnv->stabledev = isStabledev(pQuery); if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -5586,6 +5585,103 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { tfree(arithSup.data); } +static SSDataBlock* doTableScan(void* param) { + STableScanInfo* pTableScanInfo = (STableScanInfo*) param; + + SSDataBlock* pBlock = &pTableScanInfo->block; + while(tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + pTableScanInfo->numOfBlocks += 1; + + // todo check for query cancel + + tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + + SDataStatis *pStatis = pBlock->pBlockStatis; + + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); + pTableScanInfo->numOfBlockStatis += 1; + + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pTableScanInfo->numOfRows += pBlock->info.rows; + } + + return pBlock; +// int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); +// if (ret != TSDB_CODE_SUCCESS) { +// break; +// } + +// if (status == BLK_DATA_DISCARD) { +// pQuery->current->lastKey = +// QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; +// continue; +// } + } + + return NULL; +} + +static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle) { + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->apply = doTableScan; + return pInfo; +} + +// this is a blocking operator +static SSDataBlock* doAggOperator(void* param) { + SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; + + // setup the output buffer + SSDataBlock* res = calloc(1, sizeof(SSDataBlock)); + + SQuery* pQuery = pInfo->pRuntimeEnv->pQuery; + res->info.numOfCols = pQuery->numOfOutput; + + res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + + SColumnInfoData idata = {0}; + idata.info.type = pQuery->pExpr1[i].type; + idata.info.bytes = pQuery->pExpr1[i].bytes; + idata.info.colId = pQuery->pExpr1[i].base.resColId; + idata.pData = calloc(4096, idata.info.bytes); + taosArrayPush(res->pDataBlock, &idata); + + pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData; + } + + while(1) { + SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); + if (pBlock == NULL) { + break; + } + + blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock); + } + + setQueryStatus(pQuery, QUERY_COMPLETED); + finalizeQueryResult(pInfo->pRuntimeEnv); + + res->info.rows = getNumOfResult(pInfo->pRuntimeEnv); + return res; +} + +static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, + SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) { + SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + + pInfo->pResultRowInfo = pResultRowInfo; + pInfo->pTableQueryInfo = pTableQueryInfo; + pInfo->pTableScanInfo = pTableScanInfo; + pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->apply = doAggOperator; + + return pInfo; +} + /* * in each query, this function will be called only once, no retry for further result. * @@ -5600,11 +5696,15 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) return; } - scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); - finalizeQueryResult(pRuntimeEnv); + STableScanInfo* pi = createTableScanInfo(pRuntimeEnv->pQueryHandle); + SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pi); + SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo); + +// scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. - pQuery->rec.rows = getNumOfResult(pRuntimeEnv); +// pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv); doSecondaryArithmeticProcess(pQuery); if (isQueryKilled(pQInfo)) {