From 7429f37c0cf523562ff0aaaec234d4ce2c4ff1f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Apr 2022 10:46:07 +0800 Subject: [PATCH] fix(query): fix bug in calculating the aggregate function with constant numeric value as input parameter. --- source/libs/executor/inc/executorimpl.h | 10 +- source/libs/executor/src/executorimpl.c | 439 +++++++----------------- 2 files changed, 134 insertions(+), 315 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 280d3da614..fe230f04b4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -220,14 +220,9 @@ typedef struct SExecTaskInfo { } SExecTaskInfo; typedef struct STaskRuntimeEnv { - - jmp_buf env; STaskAttr* pQueryAttr; uint32_t status; // query status - void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not - void* pTsdbReadHandle; - bool enableGroupData; SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not @@ -235,12 +230,10 @@ typedef struct STaskRuntimeEnv { char* keyBuf; // window key buffer // The window result objects pool, all the resultRow Objects are allocated and managed by this object. char** prevRow; - SArray* prevResult; // intermediate result, SArray STSBuf* pTsBuf; // timestamp filter list STSCursor cur; char* tagVal; // tag value of current data block - struct SScalarFunctionSupport* scalarSup; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo* proot; SGroupResInfo groupResInfo; @@ -266,7 +259,6 @@ typedef struct SOperatorInfo { char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; - STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; @@ -291,7 +283,7 @@ typedef struct { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 11c6a08c00..85516e14ad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -186,10 +186,9 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static bool functionNeedToExecute(SqlFunctionCtx* pCtx); -static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock, SColumn* pColumn); +static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock); static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); -static bool hasMainOutput(STaskAttr* pQueryAttr); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); @@ -456,7 +455,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR if (p1 != NULL) { if (pResultRowInfo->size == 0) { existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table. -// assert(pResultRowInfo->curPos == -1); } else if (pResultRowInfo->size == 1) { SResultRowPosition* p = &pResultRowInfo->pPosition[0]; existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset); @@ -465,7 +463,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); if (index != NULL) { // TODO check the scan order for current opened time window -// pResultRowInfo->curPos = (int32_t)*index; existInCurrentResusltRowInfo = true; } else { existInCurrentResusltRowInfo = false; @@ -505,7 +502,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR } // 2. set the new time window to be the new active time window -// pResultRowInfo->curPos = pResultRowInfo->size; pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); @@ -1035,13 +1031,13 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows, return ts; } -static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); +static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; - setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/); + setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); } } @@ -1053,22 +1049,64 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo } } -static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { +static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) { + SColumnInfoData* pColInfo = NULL; + if (pInput->pData[paramIndex] == NULL) { + pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData)); + if (pColInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + // Set the correct column info (data type and bytes) + pColInfo->info.type = type; + pColInfo->info.bytes = tDataTypes[type].bytes; + + pInput->pData[paramIndex] = pColInfo; + } + + ASSERT(!IS_VAR_DATA_TYPE(type)); + colInfoDataEnsureCapacity(pColInfo, numOfRows); + + if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { + int64_t v = pFuncParam->param.i; + for(int32_t i = 0; i < numOfRows; ++i) { + colDataAppendInt64(pColInfo, i, &v); + } + } else if (type == TSDB_DATA_TYPE_DOUBLE) { + double v = pFuncParam->param.d; + for(int32_t i = 0; i < numOfRows; ++i) { + colDataAppendDouble(pColInfo, i, &v); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = MAIN_SCAN; + SInputColumnInfoData* pInput = &pCtx[i].input; + SExprInfo* pOneExpr = &pOperator->pExpr[i]; for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { SFunctParam *pFuncParam = &pOneExpr->base.pParam[j]; if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; - pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); - pCtx[i].input.totalRows = pBlock->info.rows; - pCtx[i].input.numOfRows = pBlock->info.rows; - pCtx[i].input.startRowIndex = 0; - ASSERT(pCtx[i].input.pData[j] != NULL); + pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); + pInput->totalRows = pBlock->info.rows; + pInput->numOfRows = pBlock->info.rows; + pInput->startRowIndex = 0; + ASSERT(pInput->pData[j] != NULL); + } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { + code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } @@ -1111,6 +1149,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, // } // } } + + return code; } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { @@ -1200,7 +1240,6 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SExprInfo* pExpr = pOperator->pExpr; SqlFunctionCtx* pCtx = pInfo->pCtx; @@ -1220,7 +1259,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, double v1 = 0, v2 = 0, v = 0; if (prevRowIndex == -1) { - GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]); +// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]); } else { GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); } @@ -1237,7 +1276,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { if (prevRowIndex == -1) { - pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index]; +// pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index]; } else { pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes; } @@ -1507,86 +1546,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } -static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, - int32_t tableGroupId) { - STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; - - STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - int32_t numOfOutput = pOperatorInfo->numOfOutput; - - int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); - bool ascQuery = true; - - TSKEY* tsCols = NULL; - if (pSDataBlock->pDataBlock != NULL) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); - tsCols = (int64_t*)pColDataInfo->pData; - assert(tsCols[0] == pSDataBlock->info.window.skey && - tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); - } - - int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); - - STimeWindow win = {0};//getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - - SResultRow* pResult = NULL; - int32_t forwardStep = 0; - int32_t ret = 0; - STimeWindow preWin = win; - - while (1) { - // null data, failed to allocate more memory buffer -// ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, -// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - TSKEY ekey = 0;//reviseWindowEkey(pQueryAttr, &win); - // forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, - // binarySearchForKey, true); - - // window start(end) key interpolation - // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, - // forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, - // forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); - preWin = win; - - int32_t prevEndPos = (forwardStep - 1) * step + startPos; - // startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, - // prevEndPos); - if (startPos < 0) { -// if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) { -// int32_t code = -// setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, -// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); -// if (code != TSDB_CODE_SUCCESS || pResult == NULL) { -// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); -// } -// -// startPos = pSDataBlock->info.rows - 1; - - // window start(end) key interpolation - // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, - // forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, - // forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); -// } - - break; - } - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - } - -// if (pQueryAttr->timeWindowInterpo) { -// int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0; - // saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); -// } - - // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); -} - static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) { pRowSup->win.ekey = ts; pRowSup->prevTs = ts; @@ -1730,30 +1689,85 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return true; } -void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock, SColumn* pColumn) { - if (pBlock->pBlockAgg != NULL /*&& TSDB_COL_IS_NORMAL_COL(pColumn->flag)*/) { +static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) { + if (pInput->pData[paramIndex] == NULL) { + pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData)); + if (pInput->pData[paramIndex] == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + // Set the correct column info (data type and bytes) + pInput->pData[paramIndex]->info.type = type; + pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes; + } + + SColumnDataAgg* da = NULL; + if (pInput->pColumnDataAgg[paramIndex] == NULL) { + da = taosMemoryCalloc(1, sizeof(SColumnDataAgg)); + pInput->pColumnDataAgg[paramIndex] = da; + if (da == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + da = pInput->pColumnDataAgg[paramIndex]; + } + + ASSERT(!IS_VAR_DATA_TYPE(type)); + + if (type == TSDB_DATA_TYPE_BIGINT) { + int64_t v = pFuncParam->param.i; + *da = (SColumnDataAgg) {.numOfNull = 0, .min = v, .max = v, .maxIndex = 0, .minIndex = 0, .sum = v * numOfRows}; + } else if (type == TSDB_DATA_TYPE_DOUBLE) { + double v = pFuncParam->param.d; + *da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0}; + + *(double*) &da->min = v; + *(double*) &da->max = v; + *(double*) &da->sum = v * numOfRows; + } else if (type == TSDB_DATA_TYPE_BOOL) { // todo validate this data type + bool v = pFuncParam->param.i; + + *da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0}; + *(bool*) &da->min = 0; + *(bool*) &da->max = v; + *(bool*) &da->sum = v * numOfRows; + } else if (type == TSDB_DATA_TYPE_TIMESTAMP) { + // do nothing + } else { + ASSERT(0); + } + + return TSDB_CODE_SUCCESS; +} + +void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) { + int32_t numOfRows = pBlock->info.rows; + + SInputColumnInfoData* pInput = &pCtx->input; + pInput->numOfRows = numOfRows; + pInput->totalRows = numOfRows; + + if (pBlock->pBlockAgg != NULL) { + pInput->colDataAggIsSet = true; + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { SFunctParam* pFuncParam = &pExprInfo->base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; - SInputColumnInfoData* pInput = &pCtx->input; - pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; - pInput->colDataAggIsSet = true; - pInput->numOfRows = pBlock->info.rows; - pInput->totalRows = pBlock->info.rows; // Here we set the column info data since the data type for each column data is required, but // the data in the corresponding SColumnInfoData will not be used. pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); + } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { + doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); } } } else { - pCtx->input.colDataAggIsSet = false; + pInput->colDataAggIsSet = false; } -// pCtx->hasNull = hasNull(pColumn, pAgg); - // set the statistics data for primary time stamp column // if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // pCtx->isAggSet = true; @@ -2224,33 +2238,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI return false; } -static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQuery) { - STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); - -#if defined(_DEBUG_VIEW) - printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%" PRIu64 - ", query order:%d, ts order:%d, traverse:%d, index:%d\n", - elem.ts, key, elem.tag.i, pQueryAttr->order.order, pRuntimeEnv->pTsBuf->tsOrder, - pRuntimeEnv->pTsBuf->cur.order, pRuntimeEnv->pTsBuf->cur.tsIndex); -#endif - - if (ascQuery) { - if (key < elem.ts) { - return TS_JOIN_TS_NOT_EQUALS; - } else if (key > elem.ts) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN); - } - } else { - if (key > elem.ts) { - return TS_JOIN_TS_NOT_EQUALS; - } else if (key < elem.ts) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN); - } - } - - return TS_JOIN_TS_EQUAL; -} - void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { int32_t len = 0; int32_t start = 0; @@ -2300,54 +2287,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { } } -void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) { - int32_t numOfRows = pBlock->info.rows; - - int8_t* p = NULL; - bool all = true; - - if (pRuntimeEnv->pTsBuf != NULL) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); - p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - - TSKEY* k = (TSKEY*)pColInfoData->pData; - for (int32_t i = 0; i < numOfRows; ++i) { - int32_t offset = ascQuery ? i : (numOfRows - i - 1); - int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery); - if (ret == TS_JOIN_TAG_NOT_EQUALS) { - break; - } else if (ret == TS_JOIN_TS_NOT_EQUALS) { - all = false; - continue; - } else { - assert(ret == TS_JOIN_TS_EQUAL); - p[offset] = true; - } - - if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { - break; - } - } - - // save the cursor status - // pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - } else { - // all = filterExecute(pRuntimeEnv->pQueryAttr->pFilters, numOfRows, &p, pBlock->pBlockAgg, - // pRuntimeEnv->pQueryAttr->numOfCols); - } - - if (!all) { - if (p) { - doCompactSDataBlock(pBlock, numOfRows, p); - } else { - pBlock->info.rows = 0; - pBlock->pBlockAgg = NULL; // clean the block statistics info - } - } - - taosMemoryFreeClear(p); -} - static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes); @@ -2936,18 +2875,6 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased } } -static bool hasMainOutput(STaskAttr* pQueryAttr) { - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); - - if (functionId != FUNCTION_TS && functionId != FUNCTION_TAG && functionId != FUNCTION_TAGPRJ) { - return true; - } - } - - return false; -} - STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) { STableQueryInfo* pTableQueryInfo = buf; pTableQueryInfo->lastKey = win.skey; @@ -3111,48 +3038,6 @@ void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprI // } } -int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - assert(pRuntimeEnv->pTsBuf != NULL); -#if 0 - // both the master and supplement scan needs to set the correct ts comp start position - if (pTableQueryInfo->cur.vgroupIndex == -1) { - taosVariantAssign(&pTableQueryInfo->tag, pTag); - - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQueryAttr->vgId, &pTableQueryInfo->tag); - - // failed to find data with the specified tag value and vnodeId - if (!tsBufIsValidElem(&elem)) { - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - //qError("QInfo:0x%"PRIx64" failed to find tag:%s in ts_comp", GET_TASKID(pRuntimeEnv), pTag->pz); - } else { - //qError("QInfo:0x%"PRIx64" failed to find tag:%" PRId64 " in ts_comp", GET_TASKID(pRuntimeEnv), pTag->i); - } - - return -1; - } - - // Keep the cursor info of current table - pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - //qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); - } else { - //qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); - } - - } else { - tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur); - if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - //qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); - } else { - //qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); - } - } -#endif - return 0; -} - /* * There are two cases to handle: * @@ -3323,19 +3208,15 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock return pOutput->info.rows; } -void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) { +void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) { SQueryProfEvent event = {0}; event.eventType = eventType; event.eventTime = taosGetTimestampUs(); - event.operatorType = operatorInfo->operatorType; - - if (operatorInfo->pRuntimeEnv) { - // SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo; - // if (pQInfo->summary.queryProfEvents) { - // taosArrayPush(pQInfo->summary.queryProfEvents, &event); - // } - } + event.operatorType = pOperator->operatorType; +// if (pQInfo->summary.queryProfEvents) { +// taosArrayPush(pQInfo->summary.queryProfEvents, &event); +// } } void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) { @@ -5313,7 +5194,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order); - hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); +// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); } // restore the value @@ -5387,57 +5268,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - if (pOperator->status == OP_RES_TO_RETURN) { - // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } - - return pIntervalInfo->binfo.pRes; - } - - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - break; - } - - // the pDataBlock are always the same one, no need to call this again - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - - // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); -// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); -// setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); - -// hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); - } - - pOperator->status = OP_RES_TO_RETURN; -// pQueryAttr->order.order = order; // TODO : restore the order - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - - int64_t st = taosGetTimestampUs(); - // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } - - return pIntervalInfo->binfo.pRes; -} - static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -6480,8 +6310,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.num = 1; pExp->pExpr->_function.functionId = -1; + int32_t type = nodeType(pTargetNode->pExpr); // it is a project query, or group by column - if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { + if (type == QUERY_NODE_COLUMN) { pExp->pExpr->nodeType = QUERY_NODE_COLUMN; SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr; @@ -6492,7 +6323,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; - } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) { + } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr; @@ -6503,7 +6334,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE; valueNodeToVariant(pValNode, &pExp->base.pParam[0].param); - } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { + } else if (type == QUERY_NODE_FUNCTION) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; @@ -6514,14 +6345,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.pFunctNode = pFuncNode; strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); - // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); pExp->base.numOfParams = numOfParam; for (int32_t j = 0; j < numOfParam; ++j) { - SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); + SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); if (p1->type == QUERY_NODE_COLUMN) { SColumnNode* pcn = (SColumnNode*) p1; @@ -6530,9 +6360,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; + valueNodeToVariant(pvn, &pExp->base.pParam[j].param); } } - } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) { + } else if (type == QUERY_NODE_OPERATOR) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr; @@ -6541,11 +6372,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SDataType* pType = &pNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); - pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; - -// pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; -// pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType); } else { ASSERT(0); } -- GitLab