From 966ca1600a382a0c1647153911186f281f87fa88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Feb 2021 23:20:55 +0800 Subject: [PATCH] [td-225] refactor --- src/query/inc/qExecutor.h | 9 +- src/query/src/qExecutor.c | 406 +++++++++++++++++++++++++++++++------- 2 files changed, 341 insertions(+), 74 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 3807681786..682686e737 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -289,6 +289,7 @@ typedef struct SQueryRuntimeEnv { int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SOperatorInfo* proot; + SGroupResInfo groupResInfo; } SQueryRuntimeEnv; typedef struct { @@ -315,7 +316,6 @@ typedef struct SQInfo { * the query is executed position on which meter of the whole list. * when the index reaches the last one of the list, it means the query is completed. */ - SGroupResInfo groupResInfo; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; pthread_mutex_t lock; // used to synchronize the rsp/query threads @@ -389,6 +389,13 @@ typedef struct SOffsetOperatorInfo { SQueryRuntimeEnv* pRuntimeEnv; } SOffsetOperatorInfo; +typedef struct SHashIntervalOperatorInfo { + SResultRowInfo *pResultRowInfo; + STableQueryInfo *pTableQueryInfo; + SQueryRuntimeEnv *pRuntimeEnv; + SQLFunctionCtx *pCtx; +} SHashIntervalOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5eefcdfe41..a61616badb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -185,6 +185,10 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static void destroyOperatorInfo(SOperatorInfo* pOperator); +void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); +void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); // setup the output buffer static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { @@ -204,6 +208,23 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { return res; } +static void* destroyOutputBuf(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return NULL; + } + + int32_t numOfOutput = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockStatis); + tfree(pBlock); + return NULL; +} + bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; @@ -565,7 +586,18 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t t STimeWindow w = {0}; if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value + if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { + if (QUERY_IS_ASC_QUERY(pQuery)) { + getAlignQueryTimeWindow(pQuery, ts, ts, pQuery->window.ekey, &w); + } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp + getAlignQueryTimeWindow(pQuery, ts, pQuery->window.ekey, ts, &w); + } + + pWindowResInfo->prevSKey = w.skey; + } else { w.skey = pWindowResInfo->prevSKey; + } + if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; } else { @@ -861,7 +893,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow pCtx[k].ptsList = &tsCol[pos]; } - int32_t functionId = pQuery->pExpr1[k].base.functionId; + int32_t functionId = pCtx[k].functionId; // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here @@ -1127,7 +1159,7 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* } } -static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY* tsCols, int32_t step) { +static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const TSKEY* tsCols, int32_t step) { TSKEY ts = TSKEY_INITIAL_VAL; if (tsCols == NULL) { @@ -1264,16 +1296,24 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - SResultRowInfo* pWindowResInfo = &pRuntimeEnv->resultRowInfo; + SResultRowInfo*pResultRowInfo = &pRuntimeEnv->resultRowInfo; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - int32_t prevIndex = curTimeWindowIndex(pWindowResInfo); + int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); - TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); - STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + TSKEY* tsCols = NULL; + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); + tsCols = pColDataInfo->pData; + } + + TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step); + + STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery); + bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false; SResultRow *pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // goto _end; } @@ -1282,60 +1322,60 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera int32_t startPos = pQuery->pos; TSKEY ekey = reviseWindowEkey(pQuery, &win); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); + forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. - int32_t curIndex = curTimeWindowIndex(pWindowResInfo); + int32_t curIndex = curTimeWindowIndex(pResultRowInfo); if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) { for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. - SResultRow *pRes = pWindowResInfo->pResult[j]; + SResultRow *pRes = pResultRowInfo->pResult[j]; if (pRes->closed) { assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); continue; } STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1; - doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p, + int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock); + doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock); } // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0); assert(ret == TSDB_CODE_SUCCESS); } // window start key interpolation - doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); - doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); + doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep); + doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock); STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); + startPos = getNextQualifiedWindow(pQuery, &nextWin, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); if (startPos < 0) { break; } // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { break; } ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); + forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation - doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep); - doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); + doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep); + doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock); } } @@ -2204,6 +2244,24 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, int32 return NULL; } +static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { + if (pCtx == NULL) { + return NULL; + } + + for (int32_t i = 0; i < numOfOutput; ++i) { + for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { + tVariantDestroy(&pCtx[i].param[j]); + } + + tVariantDestroy(&pCtx[i].tag); + tfree(pCtx[i].tagInfo.pTagCtxList); + } + + tfree(pCtx); + return NULL; +} + static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) { qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -2261,6 +2319,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } + } else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); @@ -2305,11 +2365,8 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { } -static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { - if (pRuntimeEnv->pQuery == NULL) { - return; - } +static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); @@ -2368,6 +2425,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); pRuntimeEnv->prevResult = NULL; + + pRuntimeEnv->outputBuf = destroyOutputBuf(pRuntimeEnv->outputBuf); + destroyOperatorInfo(pRuntimeEnv->proot); } static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { @@ -2949,6 +3009,92 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW return TSDB_CODE_SUCCESS; } +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { + *status = BLK_DATA_NO_NEEDED; + + SQuery *pQuery = pRuntimeEnv->pQuery; +// int64_t groupId = pQuery->current->groupIndex; + + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQueryCostInfo* pCost = &pQInfo->summary; + + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) { + *status = BLK_DATA_ALL_NEEDED; + } else { // check if this data block is required to load + // Calculate all time windows that are overlapping or contain current data block. + // If current data block is contained by all possible time window, do not load current data block. +// if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { + *status = BLK_DATA_ALL_NEEDED; +// } + + if ((*status) != BLK_DATA_ALL_NEEDED) { + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, + // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer +// if (QUERY_IS_INTERVAL_QUERY(pQuery)) { +// SResultRow* pResult = NULL; +// +// bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); +// +// TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; +// STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); +// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) { +// // todo handle error in set result for timewindow +// } +// } +// +// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { +// SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base; +// +// int32_t functionId = pSqlFunc->functionId; +// int32_t colId = pSqlFunc->colInfo.colId; +// (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); +// if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { +// break; +// } +// } + } + } + + if ((*status) == BLK_DATA_NO_NEEDED) { + qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->discardBlocks += 1; + } else if ((*status) == BLK_DATA_STATIS_NEEDED) { + + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); + pCost->loadBlockStatis += 1; + + if (*pStatis == NULL) { // data block statistics does not exist, load data block + *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + pCost->totalCheckedRows += pBlockInfo->rows; + } + } else { + assert((*status) == BLK_DATA_ALL_NEEDED); + + // load the data block statistics to perform further filter + pCost->loadBlockStatis += 1; + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); + + if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { + // current block has been discard due to filter applied + pCost->discardBlocks += 1; + qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = BLK_DATA_DISCARD; + } + + pCost->totalCheckedRows += pBlockInfo->rows; + pCost->loadBlocks += 1; + *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + if (*pDataBlock == NULL) { + return terrno; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int32_t midPos = -1; int32_t numOfRows; @@ -3372,13 +3518,13 @@ static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGrou void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; + SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { // all results in current group have been returned to client, try next group if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { assert(pGroupResInfo->index == 0); - if ((pQInfo->code = mergeIntoGroupResult(&pQInfo->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) { + if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) { return; } } @@ -3536,13 +3682,12 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); } -void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx, SSDataBlock* pDataBlock) { +void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SSDataBlock* pDataBlock) { int32_t tid = 0; int64_t uid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SQLFunctionCtx *pCtx = &pSQLCtx[i]; SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); /* @@ -3552,17 +3697,17 @@ void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i); RESET_RESULT_INFO(pCellInfo); - pCtx->resultInfo = pCellInfo; - pCtx->pOutput = pData->pData; + pCtx[i].resultInfo = pCellInfo; + pCtx[i].pOutput = pData->pData; // set the timestamp output buffer for top/bottom/diff query int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx->ptsOutputBuf = pSQLCtx[0].pOutput; + pCtx[i].ptsOutputBuf = pCtx[0].pOutput; } } - initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); + initCtxOutputBuf_rv(pCtx, pDataBlock->info.numOfCols); } void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { @@ -3593,19 +3738,30 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { } } -void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx) { +void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pSQLCtx[j].functionId; - pSQLCtx[j].currentStage = 0; + pCtx[j].currentStage = 0; + + SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); + if (pResInfo->initialized) { + continue; + } - SResultRowCellInfo* pResInfo = GET_RES_INFO(&pSQLCtx[j]); + aAggs[pCtx[j].functionId].init(&pCtx[j]); + } +} +void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { + for (int32_t j = 0; j < size; ++j) { + pCtx[j].currentStage = 0; + + SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (pResInfo->initialized) { continue; } - aAggs[functionId].init(&pSQLCtx[j]); + aAggs[pCtx[j].functionId].init(&pCtx[j]); } } @@ -4363,9 +4519,9 @@ static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGrou * @param pQInfo * @param result */ -void copyToOutputBuf(SQInfo *pQInfo, SResultRowInfo *pResultInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - SGroupResInfo *pGroupResInfo = &pQInfo->groupResInfo; +void copyToOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SGroupResInfo *pGroupResInfo = &pRuntimeEnv->groupResInfo; assert(pQuery->rec.rows == 0 && pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); if (!hasRemainData(pGroupResInfo)) { @@ -4373,7 +4529,74 @@ void copyToOutputBuf(SQInfo *pQInfo, SResultRowInfo *pResultInfo) { } int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; - pQuery->rec.rows = doCopyToSData(&pQInfo->runtimeEnv, pGroupResInfo, orderType); + pQuery->rec.rows = doCopyToSData(pRuntimeEnv, pGroupResInfo, orderType); +} + +static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + int32_t numOfResult = 0;//pQuery->rec.rows; // there are already exists result rows + + int32_t start = 0; + int32_t step = -1; + + qDebug("QInfo:%p start to copy data from windowResInfo to output buf", pRuntimeEnv->qinfo); + if (orderType == TSDB_ORDER_ASC) { + start = pGroupResInfo->index; + step = 1; + } else { // desc order copy all data + start = numOfRows - pGroupResInfo->index - 1; + step = -1; + } + + for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { + SResultRow* pRow = taosArrayGetP(pGroupResInfo->pRows, i); + if (pRow->numOfRows == 0) { + pGroupResInfo->index += 1; + continue; + } + + int32_t numOfRowsToCopy = pRow->numOfRows; + + //current output space is not enough to accommodate all data of this page, prepare more space +// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) { +// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult); +// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); +// } + + pGroupResInfo->index += 1; + + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); + for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); + int32_t bytes = pColInfoData->info.bytes; + + char *out = pColInfoData->pData + numOfResult * bytes; + char *in = getPosInResultPage(pRuntimeEnv, j, pRow, page); + memcpy(out, in, bytes * numOfRowsToCopy); + } + + numOfResult += numOfRowsToCopy; + if (numOfResult == pQuery->rec.capacity) { // output buffer is full + break; + } + } + + qDebug("QInfo:%p copy data to query buf completed", pRuntimeEnv->qinfo); + pBlock->info.rows = numOfResult; + return 0; +} + +static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { + assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); + if (!hasRemainData(pGroupResInfo)) { + return; + } + + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; + doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, orderType, pBlock); } static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { @@ -4510,7 +4733,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data setQueryStatus(pQuery, QUERY_OVER); } } else { - if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pQInfo->groupResInfo)) { + if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) { setQueryStatus(pQuery, QUERY_OVER); } } @@ -4985,7 +5208,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->tsdb = tsdb; pQuery->vgId = vgId; - pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0; + pRuntimeEnv->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTsBuf = pTsBuf; @@ -5484,12 +5707,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) { expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo); } - initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - copyToOutputBuf(pQInfo, pWindowResInfo); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); + copyToOutputBuf(pRuntimeEnv, pWindowResInfo); assert(pQuery->rec.rows == pWindowResInfo->size); resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - cleanupGroupResInfo(&pQInfo->groupResInfo); + cleanupGroupResInfo(&pRuntimeEnv->groupResInfo); break; } } else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) { @@ -5635,8 +5858,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * If the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (hasRemainData(&pQInfo->groupResInfo)) { - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); + if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); pQuery->rec.total += pQuery->rec.rows; if (pQuery->rec.rows > 0) { @@ -5821,7 +6044,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) { copyResToQueryResultBuf(pQInfo, pQuery); } else { - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); + copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); } qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -5868,8 +6091,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { copyResToQueryResultBuf(pQInfo, pQuery); } else { // not a interval query - initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); + copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); } // handle the limitation of output buffer @@ -5957,7 +6180,7 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { // this function never returns error? uint32_t status; - int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis, + int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis, &pBlock->pDataBlock, &status); if (code != TSDB_CODE_SUCCESS) { longjmp(pTableScanInfo->pRuntimeEnv->env, code); @@ -6103,7 +6326,7 @@ static SSDataBlock* doAggregation(void* param) { SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); SOperatorInfo* upstream = pOperator->upstream; - resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); + setDefaultOutputBuf(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); pQuery->pos = 0; while(1) { @@ -6122,6 +6345,8 @@ static SSDataBlock* doAggregation(void* param) { finalizeQueryResult(pRuntimeEnv); pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); + + destroySQLFunctionCtx(pCtx, pRuntimeEnv->outputBuf->info.numOfCols); return pRuntimeEnv->outputBuf; } @@ -6138,7 +6363,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); } - resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pArithInfo->pCtx, pRes); + setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes); SOperatorInfo* upstream = pOperator->upstream; pRuntimeEnv->pQuery->pos = 0; @@ -6254,7 +6479,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) { SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); SOperatorInfo* upstream = pOperator->upstream; - resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); pQuery->pos = 0; while(1) { @@ -6269,10 +6493,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) { } pOperator->completed = true; + + closeAllResultRows(&pRuntimeEnv->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); - pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); + destroySQLFunctionCtx(pCtx, pOperator->numOfOutput); + + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRuntimeEnv->outputBuf); + +// pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); return pRuntimeEnv->outputBuf; } @@ -6288,6 +6519,16 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) { return 1; } +static void destroyOperatorInfo(SOperatorInfo* pOperator) { + if (pOperator == NULL) { + return; + } + + destroyOperatorInfo(pOperator->upstream); + tfree(pOperator->optInfo); + tfree(pOperator); +} + static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -6369,6 +6610,25 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, return pOperator; } +static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { + SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); + + pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pTableQueryInfo = pTableQueryInfo; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "HashIntervalAggOp"; + pOperator->blockingOptr = true; + pOperator->completed = false; + pOperator->upstream = inputOptr; + pOperator->exec = doHashIntervalAgg; + pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + pOperator->optInfo = pInfo; + + return pOperator; +} /* * in each query, this function will be called only once, no retry for further result. @@ -6386,10 +6646,6 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pQuery->rec.rows = pResBlock->info.rows; - - // TODO limit/offset refactor to be one operator -// skipResults(pRuntimeEnv); -// limitOperator(pQuery, pQInfo); } static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -6455,11 +6711,11 @@ static void copyAndFillResult(SQInfo* pQInfo) { SQuery* pQuery = pRuntimeEnv->pQuery; while(1) { - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); + copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); doSecondaryArithmeticProcess(pQuery); TSKEY lastKey = 0; - if (!hasRemainData(&pQInfo->groupResInfo)) { + if (!hasRemainData(&pRuntimeEnv->groupResInfo)) { lastKey = pQuery->window.ekey; } else { lastKey = ((TSKEY*)pQuery->sdata[0]->data)[pQuery->rec.rows - 1]; @@ -6478,7 +6734,7 @@ static void copyAndFillResult(SQInfo* pQInfo) { } // here the pQuery->rec.rows == 0 - if (!hasRemainData(&pQInfo->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { + if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { break; } } @@ -6500,11 +6756,14 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } } - scanOneTableDataBlocks(pRuntimeEnv, newStartKey); - finalizeQueryResult(pRuntimeEnv); + SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = pResBlock->info.rows; +#if 0 +// scanOneTableDataBlocks(pRuntimeEnv, newStartKey); +// finalizeQueryResult(pRuntimeEnv); // skip offset result rows - pQuery->rec.rows = 0; +// pQuery->rec.rows = 0; // not fill or no result generated during this query if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->resultRowInfo.size == 0 || isPointInterpoQuery(pQuery)) { @@ -6514,22 +6773,23 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { return; } - initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); doSecondaryArithmeticProcess(pQuery); limitOperator(pQuery, pQInfo); } else { - initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); return copyAndFillResult(pQInfo); } +#endif } void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (hasNotReturnedResults(pRuntimeEnv, &pQInfo->groupResInfo)) { + if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) { if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { /* * There are remain results that are not returned due to result interpolation @@ -6546,7 +6806,7 @@ void tableQueryImpl(SQInfo *pQInfo) { } else { pQuery->rec.rows = 0; assert(pRuntimeEnv->resultRowInfo.size > 0); - copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo); + copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); doSecondaryArithmeticProcess(pQuery); if (pQuery->rec.rows > 0) { @@ -7729,7 +7989,7 @@ void freeQInfo(SQInfo *pQInfo) { tsdbDestroyTableGroup(&pQuery->tableGroupInfo); taosHashCleanup(pQInfo->arrTableIdInfo); - taosArrayDestroy(pQInfo->groupResInfo.pRows); + taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); pQInfo->signature = 0; -- GitLab