From faea042fcf3279d409fd1d3fec21077643b98370 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Feb 2021 14:01:44 +0800 Subject: [PATCH] [td-2895] refactor --- src/query/inc/qExecutor.h | 27 +-- src/query/inc/qResultbuf.h | 11 +- src/query/inc/qUtil.h | 17 +- src/query/src/qExecutor.c | 292 +++++++++++++-------------- src/query/src/qPercentile.c | 2 +- src/query/src/qResultbuf.c | 6 +- src/query/src/qUtil.c | 26 ++- src/query/tests/resultBufferTest.cpp | 6 +- 8 files changed, 184 insertions(+), 203 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 51cdd9e527..0e21041704 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -86,7 +86,7 @@ typedef struct SSqlGroupbyExpr { typedef struct SResultRow { int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer - int32_t rowId:29; // row index in buffer page + int32_t offset:29; // row index in buffer page bool startInterp; // the time window start timestamp has done the interpolation already. bool endInterp; // the time window end timestamp has done the interpolation already. bool closed; // this result status: closed or opened @@ -247,6 +247,8 @@ typedef struct SOperatorInfo { bool completed; void *optInfo; SExprInfo *pExpr; + + int32_t* rowCellInfoOffset; int32_t numOfOutput; __operator_fn_t exec; @@ -258,10 +260,8 @@ typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; void* qinfo; - -// SQLFunctionCtx* pCtx; - int32_t numOfRowsPerPage; - uint16_t* offset; +// int32_t numOfRowsPerPage; +// uint16_t* offset; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; void* pQueryHandle; @@ -272,7 +272,7 @@ typedef struct SQueryRuntimeEnv { char* keyBuf; // window key buffer SResultRowPool* pool; // window result object pool - int32_t* rowCellInfoOffset;// offset value for each row result cell info +// int32_t* rowCellInfoOffset;// offset value for each row result cell info char** prevRow; SArray* prevResult; // intermediate result, SArray @@ -355,11 +355,11 @@ typedef struct STableScanInfo { SSDataBlock block; - SQLFunctionCtx* pCtx; // next operator query context - SResultRowInfo* pResultRowInfo; + SQLFunctionCtx *pCtx; // next operator query context + SResultRowInfo *pResultRowInfo; int32_t numOfOutput; - - int64_t elapsedTime; + int32_t *rowCellInfoOffset; + int64_t elapsedTime; } STableScanInfo; typedef struct STagScanInfo { @@ -373,14 +373,15 @@ typedef struct SAggOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; - SSDataBlock* pRes; - + int32_t *rowCellInfoOffset; + SSDataBlock *pRes; } SAggOperatorInfo; typedef struct SArithOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; + int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; SSDataBlock *pOutput; int32_t bufCapacity; @@ -402,6 +403,7 @@ typedef struct SHashIntervalOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; + int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; SSDataBlock *pRes; } SHashIntervalOperatorInfo; @@ -415,6 +417,7 @@ typedef struct SHashGroupbyOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; + int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; SSDataBlock *pRes; int32_t colIndex; diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 704df9f3f2..080e3f09bb 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -55,7 +55,6 @@ typedef struct SResultBufStatis { } SResultBufStatis; typedef struct SDiskbasedResultBuf { - int32_t numOfRowsPerPage; int32_t numOfPages; int64_t totalBufSize; int64_t fileSize; // disk file size @@ -89,8 +88,7 @@ typedef struct SDiskbasedResultBuf { * @param handle * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize, - int32_t inMemBufSize, const void* handle); +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle); /** * @@ -101,13 +99,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro */ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); -/** - * - * @param pResultBuf - * @return - */ -size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf); - /** * * @param pResultBuf diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index ae0464062a..9ab2b492e3 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -44,22 +44,19 @@ void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); -SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); +SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); return pResultRowInfo->pResult[slot]; } -static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, - tFilePage* page) { - assert(pResult != NULL && pRuntimeEnv != NULL); +static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) { + assert(rowOffset >= 0 && pQuery != NULL); - SQuery *pQuery = pRuntimeEnv->pQuery; - - int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); - return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + - pQuery->pExpr1[columnIndex].bytes * realRowId; +// int32_t realRowId = (int32_t)(rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); +// return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId; + return ((char *)page->data) + rowOffset + offset; } bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); @@ -91,6 +88,6 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv); +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset); #endif // TDENGINE_QUERYUTIL_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e8f87ac65c..612416a69d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -155,9 +155,10 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); //static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); -static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols); +static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, + int32_t numOfCols, int32_t* rowCellInfoOffset); -void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput); +void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, @@ -201,7 +202,7 @@ static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBloc //static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); + SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset); static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); @@ -625,8 +626,8 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t t return w; } -static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, - int32_t numOfRowsPerPage) { +// a new buffer page for each table. Needs to opt this design +static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) { if (pWindowRes->pageId != -1) { return 0; } @@ -644,7 +645,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf pData = getResBufPage(pResultBuf, pi->pageId); pageId = pi->pageId; - if (pData->num >= numOfRowsPerPage) { + if (pData->num + size > pResultBuf->pageSize) { // release current page first, and prepare the next one releaseResBufPageInfo(pResultBuf, pi); pData = getNewDataBuf(pResultBuf, tid, &pageId); @@ -661,8 +662,9 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf // set the number of rows in current disk page if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pageId = pageId; - pWindowRes->rowId = (int32_t)(pData->num++); + pWindowRes->offset = pData->num; + pData->num += size; assert(pWindowRes->pageId >= 0); } @@ -671,7 +673,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx, - int32_t numOfOutput) { + int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -683,7 +685,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow // not assign result buffer yet, add new result buffer if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->numOfRowsPerPage); + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->resultRowSize); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -692,12 +694,12 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow // set time window for current result pResultRow->win = (*win); *pResult = pResultRow; - setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput); + setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); return TSDB_CODE_SUCCESS; } -static bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) { +static UNUSED_FUNC bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]->closed; } @@ -1166,8 +1168,8 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } } -static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput, - SSDataBlock* pSDataBlock) { +static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SHashIntervalOperatorInfo* pInfo, + int32_t numOfOutput, SSDataBlock* pSDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1189,7 +1191,8 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false; SResultRow *pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, pCtx, numOfOutput); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, + pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // goto _end; } @@ -1209,27 +1212,29 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu } STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pCtx, numOfOutput); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pInfo->pCtx, numOfOutput, + pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); // int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; // doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p, // w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, + doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); } // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, pCtx, numOfOutput); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, + pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS); } // window start key interpolation //doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep); - doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, + doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); STimeWindow nextWin = win; @@ -1241,7 +1246,8 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu } // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0, pCtx, numOfOutput); + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0, + pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { break; } @@ -1251,18 +1257,18 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu // window start(end) key interpolation // doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep); - doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, + doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); } } -static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SResultRowInfo *pResultRowInfo, SQLFunctionCtx *pCtx, - SSDataBlock *pSDataBlock, int32_t colIndex) { +static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SHashGroupbyOperatorInfo *pInfo, + SSDataBlock *pSDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, colIndex); + SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); int16_t bytes = pColInfoData->info.bytes; int16_t type = pColInfoData->info.type; @@ -1275,16 +1281,16 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat } // TODO compare with the previous value to speedup the query processing - int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, pResultRowInfo, pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex); + int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - pCtx[k].size = 1; // TODO refactor: extract from here - int32_t functionId = pCtx[k].functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); + pInfo->pCtx[k].size = 1; // TODO refactor: extract from here + int32_t functionId = pInfo->pCtx[k].functionId; + if (functionNeedToExecute(pRuntimeEnv, &pInfo->pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pInfo->pCtx[k], offset); } } } @@ -1417,7 +1423,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * #endif static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { + SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* rowCellInfoOffset) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // not assign result buffer yet, add new result buffer, TODO remove it @@ -1450,13 +1456,13 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult } if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage); + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize); if (ret != 0) { return -1; } } - setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols); + setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset); initCtxOutputBuf_rv(pCtx, numOfCols); return TSDB_CODE_SUCCESS; } @@ -1973,7 +1979,8 @@ void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inpu // return TSDB_CODE_SUCCESS; //} -static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { +static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, + int32_t** rowCellInfoOffset) { SQuery* pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); @@ -1981,6 +1988,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr return NULL; } + *rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t)); + for (int32_t i = 0; i < numOfOutput; ++i) { SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base; SQLFunctionCtx* pCtx = &pFuncCtx[i]; @@ -2014,12 +2023,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->inputBytes = pSqlFuncMsg->arg[0].argBytes; pCtx->inputType = pSqlFuncMsg->arg[0].argType; } -// } else { -// pCtx->inputBytes = pQuery->colList[index].bytes; -// pCtx->inputType = pQuery->colList[index].type; -// } -// -// assert(isValidDataType(pCtx->inputType)); + pCtx->ptsOutputBuf = NULL; pCtx->outputBytes = pExpr[i].bytes; @@ -2089,9 +2093,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr } if (i > 0) { - pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pFuncCtx[i - 1].outputBytes; - pRuntimeEnv->rowCellInfoOffset[i] = - pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes; +// (*offset)[i] = (*offset)[i - 1] + pFuncCtx[i - 1].outputBytes; + (*rowCellInfoOffset)[i] = (*rowCellInfoOffset)[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes; } } @@ -2134,11 +2137,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); pRuntimeEnv->tagVal = malloc(pQuery->tagLen); - pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t)); - pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); +// pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); - if (pRuntimeEnv->offset == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL || + if (/*pRuntimeEnv->rowCellInfoOffset == NULL || */pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) { goto _clean; @@ -2150,7 +2152,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes; } - pRuntimeEnv->offset[0] = 0; *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; // if it is group by normal column, do not set output buffer, the output buffer is pResult @@ -2226,8 +2227,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf return TSDB_CODE_SUCCESS; _clean: - tfree(pRuntimeEnv->offset); - tfree(pRuntimeEnv->rowCellInfoOffset); +// tfree(pRuntimeEnv->rowCellInfoOffset); tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->pResultRowHashTable); tfree(pRuntimeEnv->keyBuf); @@ -2300,9 +2300,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf); - tfree(pRuntimeEnv->offset); tfree(pRuntimeEnv->keyBuf); - tfree(pRuntimeEnv->rowCellInfoOffset); +// tfree(pRuntimeEnv->rowCellInfoOffset); tfree(pRuntimeEnv->prevRow); tfree(pRuntimeEnv->tagVal); @@ -2660,8 +2659,8 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i *ps = (*ps << 1u); } - pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize); - assert(pRuntimeEnv->numOfRowsPerPage <= MAX_ROWS_PER_RESBUF_PAGE); +// pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize); +// assert(pRuntimeEnv->numOfRowsPerPage <= MAX_ROWS_PER_RESBUF_PAGE); } #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) @@ -2882,8 +2881,7 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte tfree(p); } -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRowInfo *pWindowResInfo, - void *pQueryHandle, SSDataBlock *pBlock, uint32_t *status, int32_t numOfOutput) { +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, uint32_t *status) { *status = BLK_DATA_NO_NEEDED; pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; @@ -2911,9 +2909,9 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.skey:pBlock->info.window.ekey; - STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, - groupId, pCtx, numOfOutput) != TSDB_CODE_SUCCESS) { + STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, + groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } } @@ -2926,7 +2924,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * // group by + first/last should not apply the first/last block filter if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) { - (*status) |= aAggs[functionId].dataReqFunc(&pCtx[i], &pBlock->info.window, colId); + (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; } @@ -2948,10 +2946,10 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * // this function never returns error? pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); pCost->totalCheckedRows += pBlock->info.rows; } } else { @@ -2959,15 +2957,15 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) { bool load = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; + int32_t functionId = pTableScanInfo->pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - load = topbot_datablock_filter(&pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max)); + load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max)); if (!load) { // current block has been discard due to filter applied pCost->discardBlocks += 1; @@ -2981,7 +2979,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * } // current block has been discard due to filter applied - if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pCtx, pBlockInfo->rows)) { + if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; @@ -2990,7 +2988,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * pCost->totalCheckedRows += pBlockInfo->rows; pCost->loadBlocks += 1; - pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); if (pBlock->pDataBlock == NULL) { return terrno; } @@ -3393,7 +3391,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim } } -void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock) { +void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) { SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; int32_t code = TSDB_CODE_SUCCESS; @@ -3401,7 +3399,7 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold // all results in current group have been returned to client, try next group if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { assert(pGroupResInfo->index == 0); - if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) { + if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) { return; } } @@ -3438,7 +3436,8 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1; } -static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order) { +#if 0 +static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order, int32_t numOfOutput) { SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { @@ -3452,7 +3451,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functId = pQuery->pExpr1[j].base.functionId; - SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j); + SResultRowCellInfo* pInfo = getResultCell(pRow, numOfOutput, j); if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { @@ -3464,7 +3463,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR } } - +#endif static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); @@ -3531,13 +3530,12 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); pResultRow->pageId = -1; - pResultRow->rowId = -1; + pResultRow->offset = -1; return TSDB_CODE_SUCCESS; } - - -void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, SSDataBlock* pDataBlock) { +void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, + SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset) { int32_t tid = 0; int64_t uid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); @@ -3549,7 +3547,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR * set the output buffer information and intermediate buffer * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. */ - SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i); + SResultRowCellInfo* pCellInfo = getResultCell(pRow, i, rowCellInfoOffset); RESET_RESULT_INFO(pCellInfo); pCtx[i].resultInfo = pCellInfo; @@ -3835,18 +3833,18 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI } } -#endif -void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + +void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx, + int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; // group by normal columns and interval query on normal table -// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { - disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); +// disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order, numOfOutput); } else { // for simple result of table query, - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor + for (int32_t j = 0; j < numOfOutput; ++j) { // todo refactor int32_t functId = pCtx[j].functionId; if (pCtx[j].resultInfo == NULL) { continue; // resultInfo is NULL, means no data checked in previous scan @@ -3861,6 +3859,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWi } } } +#endif static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -3878,7 +3877,8 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow SET_REVERSE_SCAN_FLAG(pRuntimeEnv); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pCtx, numOfOutput); - disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput); + +// disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput); setupQueryRangeForReverseScan(pRuntimeEnv); } @@ -4050,7 +4050,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { #endif -void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) { +void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, + SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column @@ -4064,7 +4065,7 @@ void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, continue; } - setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput); + setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); for (int32_t j = 0; j < numOfOutput; ++j) { aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); @@ -4187,22 +4188,24 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i); } } - #endif - -void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput) { +void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, + int32_t numOfOutput, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); + int16_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i); + pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) { + offset += pCtx[i].outputBytes; continue; } - pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, bufPage); + pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, bufPage, pResult->offset, offset); pCtx[i].currentStage = 0; + offset += pCtx[i].outputBytes; int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { @@ -4216,8 +4219,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } -void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, - int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) { +void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SAggOperatorInfo *pInfo, int32_t numOfOutput, + int32_t groupIndex, TSKEY nextKey) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; // lastKey needs to be updated @@ -4227,7 +4230,7 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu } int64_t uid = 0; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&groupIndex, + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); assert (pResultRow != NULL); @@ -4236,7 +4239,7 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) != + if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize) != TSDB_CODE_SUCCESS) { return; } @@ -4244,16 +4247,19 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu // record the current active group id pRuntimeEnv->prevGroupId = groupIndex; - setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfOutput); - initCtxOutputBuf_rv(pCtx, numOfOutput); + setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + initCtxOutputBuf_rv(pInfo->pCtx, numOfOutput); } -void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols) { +void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, + int32_t numOfCols, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); + int16_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { - pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page); + pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, page, pResult->offset, offset); + offset += pCtx[i].outputBytes; int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { @@ -4264,7 +4270,7 @@ void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, S * set the output buffer information and intermediate buffer, * not all queries require the interResultBuf, such as COUNT */ - pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i); + pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); } } @@ -4526,13 +4532,17 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG pGroupResInfo->index += 1; tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); + + int16_t offset = 0; for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); int32_t bytes = pColInfoData->info.bytes; char *out = pColInfoData->pData + numOfResult * bytes; - char *in = getPosInResultPage(pRuntimeEnv, j, pRow, page); + char *in = getPosInResultPage(pQuery, page, pRow->offset, offset); memcpy(out, in, bytes * numOfRowsToCopy); + + offset += bytes; } numOfResult += numOfRowsToCopy; @@ -4565,7 +4575,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti } static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv, - SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) { + SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { SQuery *pQuery = pRuntimeEnv->pQuery; // update the number of result for each, only update the number of rows for the corresponding window result. @@ -4582,7 +4592,7 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv, continue; } - SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); + SResultRowCellInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset); pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); } } @@ -5218,43 +5228,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts int32_t TENMB = 1024*1024*10; if (isSTableQuery && !onlyQueryTags(pQuery)) { - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - - if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { -// int16_t type = TSDB_DATA_TYPE_NULL; -// if (pQuery->groupbyColumn) { // group by columns not tags; -// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); -// } else { -// type = TSDB_DATA_TYPE_INT; // group id -// } - -// code = initResultRowInfo(&pRuntimeEnv->resultRowInfo, 8, type); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } - } } else if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { -// int32_t numOfResultRows = getInitialPageNum(pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - -// int16_t type = TSDB_DATA_TYPE_NULL; -// if (pQuery->groupbyColumn) { -// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); -// } else { -// type = TSDB_DATA_TYPE_TIMESTAMP; -// } - -// code = initResultRowInfo(&pRuntimeEnv->resultRowInfo, numOfResultRows, type); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } } // create runtime environment @@ -6105,9 +6088,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { // this function never returns error? uint32_t status; - int32_t code = - loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo->pCtx, pTableScanInfo->pResultRowInfo, - pTableScanInfo->pQueryHandle, pBlock, &status, pTableScanInfo->numOfOutput); + int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { longjmp(pTableScanInfo->pRuntimeEnv->env, code); } @@ -6215,26 +6196,35 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->pCtx = pAggInfo->pCtx; pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset; } else if (strcasecmp(name, "HashIntervalAggOp") == 0) { SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; + } else if (strcasecmp(name, "HashGroupbyAggOp") == 0) { SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo; pTableScanInfo->pCtx = pGroupbyInfo->pCtx; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset; + } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; + } else if (strcasecmp(name, "ArithmeticOp") == 0) { SArithOperatorInfo *pInfo = pDownstream->optInfo; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; + } else { assert(0); } @@ -6309,7 +6299,7 @@ static SSDataBlock* doAggregation(void* param) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); if (!pQuery->stableQuery) { - finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo); + finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); } pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); @@ -6365,13 +6355,12 @@ static SSDataBlock* doSTableAggregation(void* param) { setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; - setExecutionContext_rv(pRuntimeEnv, &pAggInfo->resultRowInfo, pAggInfo->pCtx, pOperator->numOfOutput, - pQuery->current->groupIndex, k); + setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock); } closeAllResultRows(&pAggInfo->resultRowInfo); - updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo); + updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pAggInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); @@ -6389,7 +6378,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { SArithOperatorInfo* pArithInfo = pOperator->optInfo; SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; - setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput); + setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset); pRuntimeEnv->pQuery->pos = 0; pArithInfo->pOutput->info.rows = 0; @@ -6528,7 +6517,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { // the pDataBlock are always the same one, no need to call this again setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); - hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo->pCtx, pOperator->numOfOutput, pBlock); + hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo, pOperator->numOfOutput, pBlock); } // restore the value @@ -6537,7 +6526,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { closeAllResultRows(&pIntervalInfo->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo); + finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -6585,14 +6574,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); - hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo->pCtx, pOperator->numOfOutput, pBlock); + hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo, pOperator->numOfOutput, + pBlock); } pQuery->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes); + copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->completed = true; } @@ -6632,17 +6622,17 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); } - hashGroupbyAgg(pRuntimeEnv, pOperator, &pInfo->resultRowInfo, pInfo->pCtx, pBlock, pInfo->colIndex); + hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); } closeAllResultRows(&pInfo->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); if (!pRuntimeEnv->pQuery->stableQuery) { - finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo); + finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); } - updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo); + updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); @@ -6728,11 +6718,11 @@ static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQ pOperator->pExpr = pQuery->pExpr1; pOperator->numOfOutput = pQuery->numOfOutput; - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes); + setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset); return pOperator; } @@ -6754,7 +6744,7 @@ static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryIn pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); return pOperator; @@ -6777,7 +6767,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2; pInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); return pOperator; @@ -6842,7 +6832,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ pOperator->numOfOutput = pQuery->numOfOutput; pOperator->optInfo = pInfo; - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6867,7 +6857,7 @@ static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQu pOperator->numOfOutput = pQuery->numOfOutput; pOperator->optInfo = pInfo; - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6893,7 +6883,7 @@ SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo pOperator->numOfOutput = pQuery->numOfOutput; pOperator->optInfo = pInfo; - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 523fa42547..d366646172 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL); + int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL); if (ret != TSDB_CODE_SUCCESS) { tMemBucketDestroy(pBucket); return NULL; diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index d45e76c2fd..ed7f8c6719 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -9,8 +9,7 @@ #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize, - int32_t inMemBufSize, const void* handle) { +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle) { *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); SDiskbasedResultBuf* pResBuf = *pResultBuf; @@ -31,7 +30,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro // at least more than 2 pages must be in memory assert(inMemBufSize >= pagesize * 2); - pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize; pResBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table @@ -387,8 +385,6 @@ void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) { pResultBuf->statis.releasePages += 1; } -size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } - size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 534c62f00e..335d4d5750 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -135,20 +135,22 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 if (pResultRow->pageId >= 0) { tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); + int16_t offset = 0; for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - char * s = getPosInResultPage(pRuntimeEnv, i, pResultRow, page); size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; + char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset); memset(s, 0, size); + offset += size; RESET_RESULT_INFO(pResultInfo); } } pResultRow->numOfRows = 0; pResultRow->pageId = -1; - pResultRow->rowId = -1; + pResultRow->offset = -1; pResultRow->closed = false; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { @@ -158,9 +160,10 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 } } -SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) { - assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput); - return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); +// TODO refactor: use macro +SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) { + assert(index >= 0 && offset != NULL); + return (SResultRowCellInfo*)((char*) pRow->pCellInfo + offset[index]); } size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { @@ -373,7 +376,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { return taosArrayGetSize(pGroupResInfo->pRows); } -static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) { +static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) { SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { @@ -387,7 +390,7 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow continue; } - SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j); + SResultRowCellInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset); assert(pResultInfo != NULL); if (pResultInfo->numOfRes > 0) { @@ -438,7 +441,8 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } } -static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList) { +static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, + int32_t* rowCellInfoOffset) { bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery); int32_t code = TSDB_CODE_SUCCESS; @@ -492,7 +496,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); - int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset); if (num <= 0) { cs.rowIndex[tableIndex] += 1; @@ -539,13 +543,13 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes return code; } -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv) { +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) { int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group); + int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); if (ret != TSDB_CODE_SUCCESS) { return ret; } diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp index 7b946d8589..19e0117742 100644 --- a/src/query/tests/resultBufferTest.cpp +++ b/src/query/tests/resultBufferTest.cpp @@ -10,7 +10,7 @@ namespace { // simple test void simpleTest() { SDiskbasedResultBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4096, NULL); + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, NULL); int32_t pageId = 0; int32_t groupId = 0; @@ -52,7 +52,7 @@ void simpleTest() { void writeDownTest() { SDiskbasedResultBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL); + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL); int32_t pageId = 0; int32_t writePageId = 0; @@ -99,7 +99,7 @@ void writeDownTest() { void recyclePageTest() { SDiskbasedResultBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL); + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL); int32_t pageId = 0; int32_t writePageId = 0; -- GitLab