From 1bf7fbbc0b877287201fa251daae6f792150e770 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Jun 2022 23:22:28 +0800 Subject: [PATCH] feature(query): support interp function, and do some internal refactor. --- source/client/src/clientMain.c | 1 + source/libs/executor/inc/executil.h | 2 + source/libs/executor/inc/executorimpl.h | 11 +- source/libs/executor/src/executil.c | 30 +- source/libs/executor/src/executorimpl.c | 84 ++--- source/libs/executor/src/groupoperator.c | 19 +- source/libs/executor/src/scanoperator.c | 10 +- source/libs/executor/src/timewindowoperator.c | 311 ++++++++++-------- 8 files changed, 275 insertions(+), 193 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c5a237f5ed..ba7de65931 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -887,6 +887,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { SRequestObj *pRequest = res; pRequest->body.fetchFp = fp; + pRequest->body.param = param; SReqResultInfo *pResultInfo = &pRequest->body.resInfo; if (taos_num_fields(pRequest) == 0) { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 07686893db..6072faa373 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -123,4 +123,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); +int32_t convertFillType(int32_t mode); + #endif // TDENGINE_QUERYUTIL_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c33b6622e3..655b5841be 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -565,13 +565,14 @@ typedef struct SStreamSessionAggOperatorInfo { } SStreamSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { - SOptrBasicInfo binfo; + SSDataBlock* pRes; STimeWindow win; SInterval interval; int64_t current; SArray* pPrevRow; // SArray - SArray* pCols; // SArray int32_t fillType; // fill type + SColumn tsCol; // primary timestamp column + SExprSupp scalarSup; // scalar calculation struct SFillColInfo* pFillColInfo; // fill column info } STimeSliceOperatorInfo; @@ -669,7 +670,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); -void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); +int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSup(SExprSupp* pSup); int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); @@ -756,8 +757,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 99139be409..adf09cddc5 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -686,4 +686,32 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFree(pCond->twindows); taosMemoryFree(pCond->colList); -} \ No newline at end of file +} + +int32_t convertFillType(int32_t mode) { + int32_t type = TSDB_FILL_NONE; + switch (mode) { + case FILL_MODE_PREV: + type = TSDB_FILL_PREV; + break; + case FILL_MODE_NONE: + type = TSDB_FILL_NONE; + break; + case FILL_MODE_NULL: + type = TSDB_FILL_NULL; + break; + case FILL_MODE_NEXT: + type = TSDB_FILL_NEXT; + break; + case FILL_MODE_VALUE: + type = TSDB_FILL_SET_VALUE; + break; + case FILL_MODE_LINEAR: + type = TSDB_FILL_LINEAR; + break; + default: + type = TSDB_FILL_NONE; + } + + return type; +} diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 54b5961585..f29857c98c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2751,7 +2751,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo); if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) { @@ -2759,7 +2763,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t } size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str); + code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2780,12 +2784,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.numOfExprs = num; - pOperator->exprSupp.pExprInfo = pExprInfo; - + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo, @@ -3405,7 +3406,11 @@ void cleanupAggSup(SAggSupporter* pAggSup) { int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey) { - initExprSupp(pSup, pExprInfo, numOfCols); + int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); for (int32_t i = 0; i < numOfCols; ++i) { pSup->pCtx[i].pBuf = pAggSup->pResultBuf; @@ -3428,12 +3433,17 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) { +int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) { pSup->pExprInfo = pExprInfo; pSup->numOfExprs = numOfExpr; if (pSup->pExprInfo != NULL) { pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset); + if (pSup->pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + + return TSDB_CODE_SUCCESS; } SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -3455,7 +3465,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* } initBasicInfo(&pInfo->binfo, pResultBlock); - initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); + code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->groupId = INT32_MIN; pOperator->name = "TableAggregate"; @@ -3719,13 +3732,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr); if (pPhyNode->pExprs != NULL) { - SExprSupp* pSup1 = &pInfo->scalarSup; - pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs); - pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset); + int32_t num = 0; + SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num); + int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } } SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); - ; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -3742,15 +3757,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); - pInfo->binfo.pRes = pResBlock; - pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); + pInfo->binfo.pRes = pResBlock; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pOperator->name = "IndefinitOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfExpr; pOperator->pTaskInfo = pTaskInfo; @@ -3791,34 +3805,6 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t } } -static int32_t convertFillType(int32_t mode) { - int32_t type = TSDB_FILL_NONE; - switch (mode) { - case FILL_MODE_PREV: - type = TSDB_FILL_PREV; - break; - case FILL_MODE_NONE: - type = TSDB_FILL_NONE; - break; - case FILL_MODE_NULL: - type = TSDB_FILL_NULL; - break; - case FILL_MODE_NEXT: - type = TSDB_FILL_NEXT; - break; - case FILL_MODE_VALUE: - type = TSDB_FILL_SET_VALUE; - break; - case FILL_MODE_LINEAR: - type = TSDB_FILL_LINEAR; - break; - default: - type = TSDB_FILL_NONE; - } - - return type; -} - SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); @@ -3848,8 +3834,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = num; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = num; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -4269,6 +4255,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { + pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { ASSERT(0); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 527f4520bf..03da97cdbb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -387,11 +387,12 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pGroupCols = pGroupColList; pInfo->pCondition = pCondition; - pInfo->scalarSup.pExprInfo = pScalarExprInfo; - pInfo->scalarSup.numOfExprs = numOfScalarExpr; - pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset); + int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -697,10 +698,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); if (pPartNode->pExprs != NULL) { - pInfo->scalarSup.numOfExprs = 0; - pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs); - pInfo->scalarSup.pCtx = createSqlFunctionCtx( - pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset); + int32_t num = 0; + SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); + int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 426d45976f..540b06603f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -686,7 +686,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); - initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "DataBlockDistScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; @@ -1869,7 +1872,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); - initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->pTableList = pTableListInfo; pInfo->pColMatchInfo = colList; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c556e94c74..39a5861aa3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1705,7 +1705,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBInfo->pRes; } -static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) { +static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -1715,21 +1715,140 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); pkey->isNull = false; - char* val = colDataGetData(pColInfoData, i); + char* val = colDataGetData(pColInfoData, rowIndex); memcpy(pkey->pData, val, pkey->bytes); } } } +static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock, + int32_t rowIndex, SSDataBlock* pResBlock) { + int32_t rows = pResBlock->info.rows; + + // todo set the correct primary timestamp column + + // output the result + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); + + switch (pSliceInfo->fillType) { + case TSDB_FILL_NULL: + colDataAppendNULL(pDst, rows); + break; + + case TSDB_FILL_SET_VALUE: { + SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; + + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } + } break; + + case TSDB_FILL_LINEAR: +#if 0 + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs + || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { +// goto interp_exit; + } + + double v1 = -1, v2 = -1; + GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); + GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + + SPoint point1 = {.key = ts, .val = &v1}; + SPoint point2 = {.key = nextTs, .val = &v2}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + bool exceedMax = false, exceedMin = false; + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); + if (exceedMax || exceedMin) { + __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); + if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); + } else { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); + } + } + } +#endif + break; + + case TSDB_FILL_PREV: { + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); + colDataAppend(pDst, rows, pkey->pData, false); + } break; + + case TSDB_FILL_NEXT: { + char* p = colDataGetData(pSrc, rowIndex); + colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); + } break; + + case TSDB_FILL_NONE: + default: + break; + } + } + + pResBlock->info.rows += 1; +} + +static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { + if (pInfo->pPrevRow != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); + if (pInfo->pPrevRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SGroupKeys key = {0}; + key.bytes = pColInfo->info.bytes; + key.type = pColInfo->info.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); + taosArrayPush(pInfo->pPrevRow, &key); + } + + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STimeSliceOperatorInfo* pSliceInfo = pOperator->info; - SSDataBlock* pResBlock = pSliceInfo->binfo.pRes; + SSDataBlock* pResBlock = pSliceInfo->pRes; SExprSupp* pSup = &pOperator->exprSupp; + blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { @@ -1750,10 +1869,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } + int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); for(int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*) colDataGetData(pTsCol, i); @@ -1771,107 +1895,45 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } numOfRows += 1; + doKeepPrevRows(pSliceInfo, pBlock, i); - pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { doSetOperatorCompleted(pOperator); break; } } else if (ts < pSliceInfo->current) { - if (i != pBlock->info.window.ekey) { + if (i < pBlock->info.rows - 1) { int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { - // output the result - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot); - - switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: - colDataAppendNULL(pDst, numOfRows); - break; - - case TSDB_FILL_SET_VALUE: { - SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal; - - if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, numOfRows, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, numOfRows, (char*)&v, false); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, numOfRows, (char*)&v, false); - } - } - break; - - case TSDB_FILL_LINEAR: -#if 0 - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs - || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { -// goto interp_exit; - } - - double v1 = -1, v2 = -1; - GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); - GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); - - SPoint point1 = {.key = ts, .val = &v1}; - SPoint point2 = {.key = nextTs, .val = &v2}; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; - - int32_t srcType = pCtx->inputType; - if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); - } else { - bool exceedMax = false, exceedMin = false; - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); - if (exceedMax || exceedMin) { - __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); - if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); - } else { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); - } - } - } -#endif - break; - - case TSDB_FILL_PREV: { - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); - colDataAppend(pDst, numOfRows, pkey->pData, false); - } break; - - case TSDB_FILL_NEXT: { - } break; - - case TSDB_FILL_NONE: - default: - break; - } - - pSliceInfo->current += + while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock); + numOfRows += 1; + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); - break; - } + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; } } else { // ignore current row, and do nothing } } else { // it is the last row of current block - doKeepPrevRows(pSliceInfo, pBlock); + doKeepPrevRows(pSliceInfo, pBlock, i); + } + } else { // ts > pSliceInfo->current + while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock); + numOfRows += 1; + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; } } } @@ -1886,59 +1948,46 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { return pResBlock->info.rows == 0 ? NULL : pResBlock; } -static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) { - pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); - pInfo->pCols = taosArrayInit(4, sizeof(SColumn)); - - if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = pCtx[i].pExpr; - - SFunctParam* pParam = &pExpr->base.pParam[0]; - - SColumn c = *pParam->pCol; - taosArrayPush(pInfo->pCols, &c); - - SGroupKeys key = {0}; - key.bytes = c.bytes; - key.type = c.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, c.bytes); - taosArrayPush(pInfo->pPrevRow, &key); - } - - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } + SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode; SExprSupp* pSup = &pOperator->exprSupp; - int32_t code = initTimesliceInfo(pInfo, pSup->pCtx, numOfCols); + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode); + if (pInterpPhyNode->pExprs != NULL) { + int32_t num = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); + pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); + initResultSizeInfo(pOperator, 4096); - pInfo->binfo.pRes = pResultBlock; + pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); + pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + pInfo->win = pInterpPhyNode->timeRange; + pInfo->interval.interval = pInterpPhyNode->interval; + pInfo->current = pInfo->win.skey; pOperator->name = "TimeSliceOperator"; - // pOperator->operatorType = OP_AllTimeWindow; - pOperator->blocking = true; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -2432,7 +2481,11 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { } int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { - initExprSupp(pSup, pExprInfo, numOfCols); + int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + initBasicInfo(pBasicInfo, pResultBlock); for (int32_t i = 0; i < numOfCols; ++i) { -- GitLab