From 9b76c49ddf7365dac0d4da0c8f059d74c2d46e33 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Feb 2021 15:45:28 +0800 Subject: [PATCH] [td-225] refactor --- src/query/inc/qExecutor.h | 29 +- src/query/src/qAggMain.c | 2 +- src/query/src/qExecutor.c | 630 +++++++++++++++++++++++++++----------- 3 files changed, 480 insertions(+), 181 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5a1e317c49..3807681786 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -235,15 +235,19 @@ typedef struct SQuery { void* tsdb; SMemRef memRef; STableGroupInfo tableGroupInfo; // table list SArray + int32_t vgId; } SQuery; typedef SSDataBlock* (*__operator_fn_t)(void* param); typedef struct SOperatorInfo { - char *name; - bool blockingOptr; - void *optInfo; - + char *name; + bool blockingOptr; + bool completed; + void *optInfo; + SExprInfo *pExpr; + int32_t numOfOutput; + __operator_fn_t exec; struct SOperatorInfo *upstream; } SOperatorInfo; @@ -284,6 +288,7 @@ typedef struct SQueryRuntimeEnv { int32_t groupIndex; int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + SOperatorInfo* proot; } SQueryRuntimeEnv; typedef struct { @@ -301,8 +306,6 @@ typedef struct SQInfo { int32_t code; // error code to returned to client int64_t owner; // if it is in execution - int32_t vgId; - SQueryRuntimeEnv runtimeEnv; SQuery query; @@ -365,13 +368,27 @@ typedef struct SAggOperatorInfo { SResultRowInfo *pResultRowInfo; STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; + SQLFunctionCtx *pCtx; } SAggOperatorInfo; typedef struct SArithOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; + SQLFunctionCtx* pCtx; } SArithOperatorInfo; +typedef struct SLimitOperatorInfo { + int64_t limit; + int64_t total; + SQueryRuntimeEnv* pRuntimeEnv; +} SLimitOperatorInfo; + +typedef struct SOffsetOperatorInfo { + int64_t offset; + int64_t currentOffset; + SQueryRuntimeEnv* pRuntimeEnv; +} SOffsetOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 660a152824..8d4ce5ff86 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3564,7 +3564,7 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) { } } - assert(index >= 0 && colId >= 0); + assert(index >= 0 /*&& colId >= 0*/); return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2915987bff..5eefcdfe41 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -159,14 +159,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, SDataStatis *pStatis, SExprInfo* pExprInfo); -static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColInfo); +static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); - static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); + static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); -static int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo); +static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type); @@ -176,19 +176,28 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery); static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); static int32_t getNumOfScanTimes(SQuery* pQuery); - -static SSDataBlock* createOutputBuf(SQuery* pQuery) { - // setup the output buffer +static char *getArithemicInputSrc(void *param, const char *name, int32_t colId); +static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv); + +static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, + SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, + SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); + +// setup the output buffer +static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = pQuery->numOfOutput; + res->info.numOfCols = numOfOutput; - res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {0}; - idata.info.type = pQuery->pExpr1[i].type; - idata.info.bytes = pQuery->pExpr1[i].bytes; - idata.info.colId = pQuery->pExpr1[i].base.resColId; - idata.pData = calloc(4096, idata.info.bytes); + idata.info.type = pExpr[i].type; + idata.info.bytes = pExpr[i].bytes; + idata.info.colId = pExpr[i].base.resColId; + idata.pData = calloc(4096, idata.info.bytes * 4096); taosArrayPush(res->pDataBlock, &idata); } @@ -1179,19 +1188,19 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc } -static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { - SQuery* pQuery = pRuntimeEnv->pQuery; +static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + pCtx[i].size = pSDataBlock->info.rows; - if (pRuntimeEnv->pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SColIndex *pCol = &pQuery->pExpr1[i].base.colInfo; + SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - SColIndex* pColIndex = &pQuery->pExpr1[i].base.colInfo; + SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex); assert(p->info.colId == pColIndex->colId); - SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->pInput = p->pData; + SQLFunctionCtx* pCtx1 = &pCtx[i]; + pCtx1->pInput = p->pData; uint32_t status = aAggs[pCtx->functionId].status; if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { @@ -1200,17 +1209,18 @@ static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDat } } } + } else { + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + pCtx[i].size = pSDataBlock->info.rows; + } } - - } -static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - SQuery * pQuery = pRuntimeEnv->pQuery; +static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) { - setBlockStatisInfo(&pCtx[k], pSDataBlock, &pQuery->pExpr1[k].base.colInfo); + for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { @@ -1220,60 +1230,112 @@ static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSData } } -static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { - SArithmeticSupport arithSup = {0}; - - SQuery *pQuery = pRuntimeEnv->pQuery; +static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) { + sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols; + sas->pArithExpr = pExprInfo; - // create the output result buffer - tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES); - for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { - int32_t bytes = pQuery->pExpr2[i].bytes; - data[i] = (tFilePage *)malloc((size_t)(bytes * pQuery->rec.rows) + sizeof(tFilePage)); + sas->colList = calloc(1, pSDataBlock->info.numOfCols*sizeof(SColumnInfo)); + for(int32_t i = 0; i < sas->numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, i); + sas->colList[i] = pColData->info; } - arithSup.numOfCols = (int32_t)pSDataBlock->info.numOfCols; - arithSup.exprList = pQuery->pExpr1; - arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); + sas->data = calloc(sas->numOfCols, POINTER_BYTES); // set the input column data - for (int32_t f = 0; f < arithSup.numOfCols; ++f) { + for (int32_t f = 0; f < pSDataBlock->info.numOfCols; ++f) { SColumnInfoData *pColumnInfoData = taosArrayGet(pSDataBlock->pDataBlock, f); - arithSup.data[f] = pColumnInfoData->pData; + sas->data[f] = pColumnInfoData->pData; } +} - // output result number of columns - for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) { - for (int i = 0; i < pQuery->numOfExpr2; ++i) { - SExprInfo *pExpr = &pQuery->pExpr2[i]; +static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SExprInfo *pExprInfo, int32_t numOfOutput) { + SQuery *pQuery = pRuntimeEnv->pQuery; - // calculate the result from several other columns - SSqlFuncMsg *pSqlFunc = &pExpr->base; - if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) { - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId && - pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) { - memcpy(data[i]->data, pQuery->sdata[j]->data, (size_t)(pQuery->pExpr1[j].bytes * pSDataBlock->info.rows)); - break; - } - } - } else { - arithSup.pArithExpr = pExpr; - arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, - TSDB_ORDER_ASC, getArithemicInputSrc); + for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t functionId = pExprInfo[k].base.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + pCtx[k].startTs = pQuery->window.skey; + aAggs[functionId].xFunction(&pCtx[k]); + } + } +} + +static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + SResultRowInfo* pWindowResInfo = &pRuntimeEnv->resultRowInfo; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + int32_t prevIndex = curTimeWindowIndex(pWindowResInfo); + + TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + + SResultRow *pResult = NULL; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + // goto _end; + } + + int32_t forwardStep = 0; + int32_t startPos = pQuery->pos; + + TSKEY ekey = reviseWindowEkey(pQuery, &win); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); + + // prev time window not interpolation yet. + int32_t curIndex = curTimeWindowIndex(pWindowResInfo); + if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) { + for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. + SResultRow *pRes = pWindowResInfo->pResult[j]; + if (pRes->closed) { + assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + continue; } + + STimeWindow w = pRes->win; + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); + assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + + int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p, + w.ekey, RESULT_ROW_END_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + + doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock); } - for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { - memcpy(pQuery->sdata[i]->data, data[i]->data, (size_t)(pQuery->pExpr2[i].bytes * pQuery->rec.rows)); + // restore current time window + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + assert(ret == TSDB_CODE_SUCCESS); + } + + // window start key interpolation + doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); + doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); + + STimeWindow nextWin = win; + while (1) { + int32_t prevEndPos = (forwardStep - 1) * step + startPos; + startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); + if (startPos < 0) { + break; } - for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { - tfree(data[i]); + // null data, failed to allocate more memory buffer + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + break; } - tfree(data); - tfree(arithSup.data); + ekey = reviseWindowEkey(pQuery, &nextWin); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); + + // window start(end) key interpolation + doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep); + doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); } } @@ -1440,7 +1502,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } setResultOutputBuf(pRuntimeEnv, pResultRow); - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); return TSDB_CODE_SUCCESS; } @@ -2015,42 +2077,19 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return TSDB_CODE_SUCCESS; } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) { - qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); - SQuery *pQuery = pRuntimeEnv->pQuery; - - pRuntimeEnv->prevGroupId = INT32_MIN; - - pQuery->interBufSize = getOutputInterResultBufSize(pQuery); - - pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); - pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); - pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); - pRuntimeEnv->tagVal = malloc(pQuery->tagLen); - pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); - pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t)); - pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); - pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); - - if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || - pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || - pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) { - goto _clean; - } +static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfOutput, SExprInfo* pExpr, int32_t order, int32_t vgId) { + SQuery* pQuery = pRuntimeEnv->pQuery; - char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow; - pRuntimeEnv->prevRow[0] = start; - for(int32_t i = 1; i < pQuery->numOfCols; ++i) { - pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes; + SQLFunctionCtx *pQCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); + if (pQCtx == NULL) { + return NULL; } - pRuntimeEnv->offset[0] = 0; - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base; + for (int32_t i = 0; i < numOfOutput; ++i) { + SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base; + SQLFunctionCtx* pCtx = &pQCtx[i]; - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - SColIndex * pIndex = &pSqlFuncMsg->colInfo; + SColIndex *pIndex = &pSqlFuncMsg->colInfo; if (TSDB_COL_REQ_NULL(pIndex->flag)) { pCtx->requireNull = true; @@ -2085,13 +2124,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf assert(isValidDataType(pCtx->inputType)); pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pQuery->pExpr1[i].bytes; - pCtx->outputType = pQuery->pExpr1[i].type; + pCtx->outputBytes = pExpr[i].bytes; + pCtx->outputType = pExpr[i].type; - pCtx->order = pQuery->order.order; + pCtx->order = order; pCtx->functionId = pSqlFuncMsg->functionId; pCtx->stableQuery = pQuery->stableQuery; - pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; + pCtx->interBufBytes = pExpr[i].interBytes; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; @@ -2114,10 +2153,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - int32_t f = pQuery->pExpr1[0].base.functionId; + int32_t f = pExpr[0].base.functionId; assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); - pCtx->param[2].i64 = order; + pCtx->param[2].i64 = pQuery->order.order; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[3].i64 = functionId; pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; @@ -2152,12 +2191,55 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } if (i > 0) { - pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes; + pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pQCtx[i - 1].outputBytes; pRuntimeEnv->rowCellInfoOffset[i] = - pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes; + pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes; } } + return pQCtx; + + _clean: + tfree(pQCtx); + return NULL; +} + +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) { + qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); + SQuery *pQuery = pRuntimeEnv->pQuery; + + pRuntimeEnv->prevGroupId = INT32_MIN; + pRuntimeEnv->pQuery = pQuery; + + pQuery->interBufSize = getOutputInterResultBufSize(pQuery); + + pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); + pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); + pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); + pRuntimeEnv->tagVal = malloc(pQuery->tagLen); + pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t)); + pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); + pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); + + if (pRuntimeEnv->offset == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL || + pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL || + pRuntimeEnv->tagVal == NULL) { + goto _clean; + } + + char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow; + pRuntimeEnv->prevRow[0] = start; + for(int32_t i = 1; i < pQuery->numOfCols; ++i) { + pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes; + } + + pRuntimeEnv->offset[0] = 0; + pRuntimeEnv->pCtx = createSQLFunctionCtx(pRuntimeEnv, pQuery->numOfOutput, pQuery->pExpr1, order, vgId); + if (pRuntimeEnv->pCtx == NULL) { + goto _clean; + } + *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; // if it is group by normal column, do not set output buffer, the output buffer is pResult @@ -2171,6 +2253,28 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); + + // group by normal column, sliding window query, interval query are handled by interval query processor + if (!pQuery->stableQuery) { // interval (down sampling operation) + if (isFixedOutputQuery(pRuntimeEnv)) { + pRuntimeEnv->proot = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + if (pQuery->pExpr2 != NULL) { + pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } + } else { // diff/add/multiply/subtract/division + assert(pQuery->checkResultBuf == 1); + pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + } + + if (pQuery->limit.offset > 0) { + pRuntimeEnv->proot = createOffsetOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } + + if (pQuery->limit.limit > 0) { + pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } + } + return TSDB_CODE_SUCCESS; _clean: @@ -3429,19 +3533,17 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity)); } - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); } -void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - +void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx, SSDataBlock* pDataBlock) { int32_t tid = 0; int64_t uid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid); - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - SColumnInfoData* pData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, i); + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SQLFunctionCtx *pCtx = &pSQLCtx[i]; + SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); /* * set the output buffer information and intermediate buffer @@ -3456,11 +3558,11 @@ void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) { // set the timestamp output buffer for top/bottom/diff query int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; + pCtx->ptsOutputBuf = pSQLCtx[0].pOutput; } } - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); } void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { @@ -3491,19 +3593,19 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { } } -void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { +void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pRuntimeEnv->pCtx[j].functionId; - pRuntimeEnv->pCtx[j].currentStage = 0; + int32_t functionId = pSQLCtx[j].functionId; + pSQLCtx[j].currentStage = 0; - SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo* pResInfo = GET_RES_INFO(&pSQLCtx[j]); if (pResInfo->initialized) { continue; } - aAggs[functionId].init(&pRuntimeEnv->pCtx[j]); + aAggs[functionId].init(&pSQLCtx[j]); } } @@ -3985,7 +4087,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { // record the current active group id pRuntimeEnv->prevGroupId = groupIndex; setResultOutputBuf(pRuntimeEnv, pResultRow); - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); } void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { @@ -4039,8 +4141,9 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } } -int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { + SQuery* pQuery = pRuntimeEnv->pQuery; + assert(pRuntimeEnv->pTsBuf != NULL); // both the master and supplement scan needs to set the correct ts comp start position @@ -4049,14 +4152,14 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf if (pTableQueryInfo->cur.vgroupIndex == -1) { tVariantAssign(&pTableQueryInfo->tag, pTag); - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, &pTableQueryInfo->tag); + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag); // failed to find data with the specified tag value and vnodeId if (!tsBufIsValidElem(&elem)) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); + qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz); } else { - qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pQInfo, pTag->i64); + qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64); } return false; @@ -4065,18 +4168,18 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf // keep the cursor info of current meter pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } } else { tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } } @@ -4362,9 +4465,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->pExpr2 == NULL) { - SSDataBlock* pRes = pRuntimeEnv->outputBuf; + SSDataBlock* pRes = pRuntimeEnv->outputBuf; + if (pQuery->pExpr2 == NULL) { for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); @@ -4372,10 +4475,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } else { for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) { - int32_t bytes = pQuery->pExpr2[col].bytes; - - memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); - data += bytes * numOfRows; + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); + data += pColRes->info.bytes * numOfRows; } } @@ -4882,7 +4984,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts } pQuery->tsdb = tsdb; - pQInfo->vgId = vgId; + pQuery->vgId = vgId; pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0; pRuntimeEnv->pQuery = pQuery; @@ -4952,7 +5054,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts // create runtime environment int32_t numOfTables = pQuery->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQuery->vgId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4996,7 +5098,7 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa } if (pRuntimeEnv->pTsBuf != NULL) { - setTimestampListJoinInfo(pQInfo, pTableQueryInfo); + setTimestampListJoinInfo(pRuntimeEnv, pTableQueryInfo); } if (QUERY_IS_INTERVAL_QUERY(pQuery)) { @@ -5145,7 +5247,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { tVariant* pTag = &pRuntimeEnv->pCtx[0].tag; if (pRuntimeEnv->cur.vgroupIndex == -1) { - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, pTag); + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag); // failed to find data with the specified tag value and vnodeId if (!tsBufIsValidElem(&elem)) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { @@ -5170,7 +5272,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) { - STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQInfo->vgId, pTag); + STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag); // failed to find data with the specified tag value and vnodeId if (!tsBufIsValidElem(&elem1)) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { @@ -5201,7 +5303,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { } } - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); return true; } @@ -5286,7 +5388,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { longjmp(pRuntimeEnv->env, terrno); } - initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); @@ -5873,7 +5975,9 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { } static SSDataBlock* doTableScan(void* param) { - STableScanInfo * pTableScanInfo = (STableScanInfo *)param; + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + STableScanInfo *pTableScanInfo = pOperator->optInfo; SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -5946,7 +6050,9 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "SeqScanTableOp"; pOptr->blockingOptr = false; - pOptr->optInfo = pInfo; + pOptr->optInfo = pInfo; + pOptr->completed = false; + pOptr->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOptr->exec = doTableScan; return pOptr; @@ -5985,26 +6091,33 @@ static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { // this is a blocking operator static SSDataBlock* doAggregation(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { + return NULL; + } SAggOperatorInfo* pAggInfo = pOperator->optInfo; SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; - SOperatorInfo* upstream = pOperator->upstream; + SQuery* pQuery = pRuntimeEnv->pQuery; - resetDefaultResInfoOutputBuf_rv(pRuntimeEnv); - pRuntimeEnv->pQuery->pos = 0; + SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + SOperatorInfo* upstream = pOperator->upstream; + + resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); + pQuery->pos = 0; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream->optInfo); + SSDataBlock* pBlock = upstream->exec(upstream); if (pBlock == NULL) { break; } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pRuntimeEnv, pBlock); - aggApplyFunctions(pRuntimeEnv, pBlock); + setInputSDataBlock(pOperator, pCtx, pBlock); + aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock); } + pOperator->completed = true; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); @@ -6018,22 +6131,144 @@ static SSDataBlock* doArithmeticOperation(void* param) { SArithOperatorInfo* pArithInfo = pOperator->optInfo; SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; - SOperatorInfo* upstream = pOperator->upstream; + SQuery* pQuery = pRuntimeEnv->pQuery; + SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - resetDefaultResInfoOutputBuf_rv(pRuntimeEnv); + if (pArithInfo->pCtx == NULL) { + pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + } + + resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pArithInfo->pCtx, pRes); + + SOperatorInfo* upstream = pOperator->upstream; pRuntimeEnv->pQuery->pos = 0; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream->optInfo); + SSDataBlock* pBlock = upstream->exec(upstream); if (pBlock == NULL) { + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); break; } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pRuntimeEnv, pBlock); - aggApplyFunctions(pRuntimeEnv, pBlock); + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + pArithInfo->pCtx[i].size = pBlock->info.rows; + if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) { + setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, pOperator->pExpr, pBlock); + } else { + SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; + if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { + for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j); + if (p->info.colId == pCol->colId) { + pArithInfo->pCtx[i].pInput = p->pData; + break; + } + } + } + } + } + + arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput); + pRes->info.rows += pBlock->info.rows; + if (pRes->info.rows > 4096) { + break; + } + } + + pRuntimeEnv->outputBuf = pRes; + return pRes; +} + +static SSDataBlock* doLimit(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { + return NULL; } + SLimitOperatorInfo* pInfo = pOperator->optInfo; + + SOperatorInfo* upstream = pOperator->upstream; + SSDataBlock* pBlock = upstream->exec(upstream); + if (pBlock == NULL) { + setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + pOperator->completed = true; + return NULL; + } + + if (pInfo->total + pBlock->info.rows >= pInfo->limit) { + pBlock->info.rows = (pInfo->limit - pInfo->total); + + setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + pOperator->completed = true; + } + + return pBlock; +} + +static SSDataBlock* doOffset(void* param) { + SOperatorInfo *pOperator = (SOperatorInfo *)param; + + SOffsetOperatorInfo *pInfo = pOperator->optInfo; + SOperatorInfo* upstream = pOperator->upstream; + + while (1) { + SSDataBlock *pBlock = upstream->exec(upstream); + if (pBlock == NULL) { + setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + return NULL; + } + + if (pInfo->currentOffset == 0) { + return pBlock; + } else if (pInfo->currentOffset > pBlock->info.rows) { + pInfo->currentOffset -= pBlock->info.rows; + } else { + int32_t remain = pBlock->info.rows - pInfo->currentOffset; + pBlock->info.rows = remain; + + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); + } + + pInfo->currentOffset = 0; + return pBlock; + } + } +} + +static SSDataBlock* doHashIntervalAgg(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { + return NULL; + } + + SAggOperatorInfo* pAggInfo = pOperator->optInfo; + SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; + + SQuery* pQuery = pRuntimeEnv->pQuery; + + SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + SOperatorInfo* upstream = pOperator->upstream; + + resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); + pQuery->pos = 0; + + while(1) { + SSDataBlock* pBlock = upstream->exec(upstream); + if (pBlock == NULL) { + break; + } + + // the pDataBlock are always the same one, no need to call this again + setInputSDataBlock(pOperator, pCtx, pBlock); + hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock); + } + + pOperator->completed = true; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); @@ -6053,7 +6288,7 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) { return 1; } -static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, +static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -6064,15 +6299,17 @@ static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultR SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AggregationOp"; pOperator->blockingOptr = true; + pOperator->completed = false; pOperator->optInfo = pInfo; pOperator->upstream = inputOptr; pOperator->exec = doAggregation; + pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; return pOperator; } -static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, - SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); pInfo->pTableQueryInfo = pTableQueryInfo; @@ -6081,14 +6318,57 @@ static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResul SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; pOperator->blockingOptr = false; + pOperator->completed = false; pOperator->optInfo = pInfo; pOperator->upstream = inputOptr; pOperator->exec = doArithmeticOperation; + pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2; + pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2; + + return pOperator; +} + +static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { + SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); + + pInfo->limit = pRuntimeEnv->pQuery->limit.limit; + pInfo->pRuntimeEnv = pRuntimeEnv; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "LimitOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->upstream = inputOptr; + pOperator->exec = doLimit; + pOperator->pExpr = NULL; + pOperator->numOfOutput = 0; + pOperator->optInfo = pInfo; + + return pOperator; +} + +static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { + SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); + + pInfo->offset = pRuntimeEnv->pQuery->limit.offset; + pInfo->currentOffset = pInfo->offset; + pInfo->pRuntimeEnv = pRuntimeEnv; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "OffsetOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->upstream = inputOptr; + pOperator->exec = doOffset; + pOperator->pExpr = NULL; + pOperator->numOfOutput = 0; + pOperator->optInfo = pInfo; return pOperator; } -static /* * in each query, this function will be called only once, no retry for further result. @@ -6096,7 +6376,7 @@ static * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*) from table_name group by status_column; */ -static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { +void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -6104,11 +6384,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) return; } - SOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - SSDataBlock* pResBlock = pAggInfo->exec(pAggInfo->optInfo); - + SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pQuery->rec.rows = pResBlock->info.rows; -// doSecondaryArithmeticProcess(pQuery); // TODO limit/offset refactor to be one operator // skipResults(pRuntimeEnv); @@ -6125,12 +6402,16 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) } // skip blocks without load the actual data block from file if no filter condition present - skipBlocks(&pQInfo->runtimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return; - } +// skipBlocks(&pQInfo->runtimeEnv); +// if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { +// setQueryStatus(pQuery, QUERY_COMPLETED); +// return; +// } + SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = (pResBlock != NULL)? pResBlock->info.rows : 0; + +#if 0 while (1) { scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); finalizeQueryResult(pRuntimeEnv); @@ -6166,6 +6447,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) if (!isTsCompQuery(pQuery)) { assert(pQuery->rec.rows <= pQuery->rec.capacity); } +#endif } static void copyAndFillResult(SQInfo* pQInfo) { @@ -7125,7 +7407,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } doUpdateExprColumnIndex(pQuery); - pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery); + pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery->pExpr1, pQuery->numOfOutput); int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { @@ -7622,8 +7904,8 @@ void buildTagQueryResult(SQInfo* pQInfo) { *(int32_t *)output = id->tid; output += sizeof(id->tid); - *(int32_t *)output = pQInfo->vgId; - output += sizeof(pQInfo->vgId); + *(int32_t *)output = pQuery->vgId; + output += sizeof(pQuery->vgId); if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { char* data = tsdbGetTableName(item->pTable); -- GitLab