diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 59db66beccf6d911616fe5c5c853490bcbb86c34..f2f0bc20551336b8bfa14414697b9b5299ea6a49 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -20,6 +20,12 @@ extern "C" { #endif +typedef struct { + char* pData; + bool isNull; + int16_t type; + int32_t bytes; +} SGroupKeys, SStateKeys; #ifdef __cplusplus } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ee8cdc6b1bbeb6d87edad5c98a69882961fe85df..a9420cf8869b02df5377b8c829de05c7d348c9a7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -40,6 +40,7 @@ extern "C" { #include "tpagedbuf.h" #include "vnode.h" +#include "executorInt.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); @@ -478,16 +479,8 @@ typedef struct SFillOperatorInfo { void** p; SSDataBlock* existNewGroupBlock; bool multigroupResult; - SInterval intervalInfo; } SFillOperatorInfo; -typedef struct { - char* pData; - bool isNull; - int16_t type; - int32_t bytes; -} SGroupKeys, SStateKeys; - typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; SArray* pGroupCols; // group by columns, SArray @@ -676,7 +669,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, + SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 26d066d9a9f9fbe484b51d5802341b55527211bd..a1f45fd66527bb3c904c1eb8bbeef258162bfc1f 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -27,13 +27,12 @@ extern "C" { struct SSDataBlock; typedef struct SFillColInfo { -// STColumn col; // column info - SResSchema col; - int16_t functionId; // sql function id - int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN - int16_t tagIndex; // index of current tag in SFillTagColInfo array list - int32_t offset; - union {int64_t i; double d;} val; + SExprInfo *pExpr; +// SResSchema schema; +// int16_t functionId; // sql function id + int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN + int16_t tagIndex; // index of current tag in SFillTagColInfo array list + SVariant fillVal; } SFillColInfo; typedef struct { @@ -56,9 +55,10 @@ typedef struct SFillInfo { int32_t numOfCols; // number of columns, including the tags columns int32_t rowSize; // size of each row SInterval interval; - char * prevValues; // previous row of data, to generate the interpolation results - char * nextValues; // next row of data - char** pData; // original result data block involved in filling data + + SArray *prev; + SArray *next; + SSDataBlock *pSrcBlock; int32_t alloc; // data buffer size in rows SFillColInfo* pFillCol; // column info for fill operations @@ -72,7 +72,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); -struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val); +struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, @@ -80,7 +80,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 struct SFillColInfo* pCol, const char* id); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); -int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); +int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); int64_t getFillInfoStart(struct SFillInfo *pFillInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4fabe128a8226237baa4fa6c3fe91d9b590c864f..0b3343e0743421f353ed67b5740e2f2f2b58d646 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1235,9 +1235,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (fmIsPseudoColumnFunc(pfCtx->functionId)) { // do nothing } else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) { - // todo set the correct timestamp column - pfCtx->input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1); - SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]); pfCtx->fpSet.init(&pCtx[k], pResInfo); @@ -2490,7 +2487,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc // if (pQueryAttr->pFilters != NULL) { // filterSetColFieldData(pQueryAttr->pFilters, pBlock->info.numOfCols, pBlock->pDataBlock); // } - + // if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) { // filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); // } @@ -3204,16 +3201,16 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pOutput, int32_t capacity, void** p) { +int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) { // for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { // SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); // p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); // } - int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows); - pOutput->info.rows += numOfRows; + int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows); + pBlock->info.rows += numOfRows; - return pOutput->info.rows; + return pBlock->info.rows; } void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) { @@ -5562,7 +5559,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity); pInfo->existNewGroupBlock = NULL; *newgroup = true; } @@ -5571,7 +5568,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity); if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) { return; } @@ -5632,7 +5629,8 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p); + blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity); // current group has no more result to return if (pResBlock->info.rows > 0) { @@ -6206,19 +6204,17 @@ _error: return NULL; } -static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, +static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { - SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL); + SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); - // TODO set correct time precision STimeWindow w = TSWINDOW_INITIALIZER; - getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w); + getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w); int32_t order = TSDB_ORDER_ASC; pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); - if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } else { @@ -6226,15 +6222,29 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t } } +static SArray* getFillValue(SNodeListNode* pNodeList) { + SArray* pList = taosArrayInit(4, sizeof(SVariant)); + + size_t len = LIST_LENGTH(pNodeList->pNodeList); + for(int32_t i = 0; i < len; ++i) { + SValueNode* pvalue = (SValueNode*)nodesListGetNode(pNodeList->pNodeList, i); + + SVariant v = {0}; + valueNodeToVariant(pvalue, &v); + taosArrayPush(pList, &v); + } + + return pList; +} + SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, + SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* pValueNode, bool multigroupResult, SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); pInfo->pRes = pResBlock; pInfo->multigroupResult = multigroupResult; - pInfo->intervalInfo = *pInterval; int32_t type = TSDB_FILL_NONE; switch (fillType) { @@ -6263,7 +6273,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(pOperator, 4096); - int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity, + int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -6272,7 +6282,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->name = "FillOperator"; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Fill; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; @@ -6621,13 +6631,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t primaryTsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); - - // if (pIntervalPhyNode->pFill != NULL) { - // pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode, - // NULL, - // false, pTaskInfo); - // } - } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -6665,6 +6668,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { + SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num); + + SInterval* pInterval = &((STableIntervalOperatorInfo*)ops[0]->info)->interval; + pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); } else { ASSERT(0); } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index aeed07c636cdd31cf31a50441f9c85f985b60fc4..018f0016a22db237b70bdd43135492d835147ee1 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -13,20 +13,21 @@ * along with this program. If not, see . */ -#include "function.h" #include "os.h" -#include "querynodes.h" - #include "taosdef.h" #include "tmsg.h" #include "ttypes.h" -#include "tfill.h" -#include "function.h" #include "tcommon.h" #include "thash.h" #include "ttime.h" +#include "function.h" +#include "tdatablock.h" +#include "executorInt.h" +#include "querynodes.h" +#include "tfill.h" + #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) @@ -37,168 +38,208 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { continue; } - char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows); + SResSchema* pSchema = &pCol->pExpr->base.resSchema; + char* val1 = elePtrAt(data[j], pSchema->bytes, genRows); assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - -// assert (pTag->col.colId == pCol->col.colId); - assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); + assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type); } } -static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) { +static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) { // the first are always the timestamp column, so start from the second column. - for (int32_t i = 1; i < numOfCol; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - - char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex); - setNull(output, pCol->col.type, pCol->col.bytes); + for (int32_t i = 1; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i); + colDataAppendNULL(p, rowIndex); } } -static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) { - char* prev = pFillInfo->prevValues; - char* next = pFillInfo->nextValues; +#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId) +#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId) + +static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); +static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock *pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) { SPoint point1, point2, point; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); // set the primary timestamp column value int32_t index = pFillInfo->numOfCurrent; - char* val = elePtrAt(data[0], TSDB_KEYSIZE, index); + SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0); + char* val = colDataGetData(pCol0, index); + *(TSKEY*) val = pFillInfo->currentKey; // set the other values if (pFillInfo->type == TSDB_FILL_PREV) { - char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next; - - if (p != NULL) { - for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; - char* output = elePtrAt(data[i], pCol->col.bytes, index); -// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; } - } else { // no prev value yet, set the value for NULL - setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); + + SGroupKeys* pKey = taosArrayGet(p, i); + SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); + doSetVal(pDstColInfoData, index, pKey); } } else if (pFillInfo->type == TSDB_FILL_NEXT) { - char* p = FILL_IS_ASC_FILL(pFillInfo)? next : prev; - - if (p != NULL) { - for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; - char* output = elePtrAt(data[i], pCol->col.bytes, index); -// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; } - } else { // no prev value yet, set the value for NULL - setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); + + SGroupKeys* pKey = taosArrayGet(p, i); + SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); + doSetVal(pDstColInfoData, index, pKey); } } else if (pFillInfo->type == TSDB_FILL_LINEAR) { // TODO : linear interpolation supports NULL value - if (prev != NULL && !outOfBound) { + if (outOfBound) { + setNullRow(pBlock, pFillInfo->numOfCols, index); + } else { for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; if (TSDB_COL_IS_TAG(pCol->flag)) { continue; } - int16_t type = pCol->col.type; - int16_t bytes = pCol->col.bytes; + int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); - char *val1 = elePtrAt(data[i], pCol->col.bytes, index); - if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { - setNull(val1, pCol->col.type, bytes); + int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + + int16_t type = pCol->pExpr->base.resSchema.type; + SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); + if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) { + colDataAppendNULL(pDstCol, index); continue; } - point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset}; - point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; - point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; + SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0); + int64_t prevTs = *(int64_t*)pKey1->pData; + + SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); + char* data = colDataGetData(pSrcCol, pFillInfo->index); + + point1 = (SPoint){.key = prevTs, .val = pKey->pData}; + point2 = (SPoint){.key = ts, .val = data}; + + int64_t out = 0; + point = (SPoint){.key = pFillInfo->currentKey, .val = &out}; taosGetLinearInterpolationVal(&point, type, &point1, &point2, type); + + colDataAppend(pDstCol, index, (const char*)&out, false); } - } else { - setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); } - } else { // fill the default value */ + } else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL + setNullRow(pBlock, pFillInfo->numOfCols, index); + } else { // fill with user specified value for each column for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) { continue; } - char* val1 = elePtrAt(data[i], pCol->col.bytes, index); - assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type); + SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; + + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, index, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, index, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, index, (char*)&v, false); + } } } - setTagsValue(pFillInfo, data, index); - pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, - pFillInfo->interval.precision); +// setTagsValue(pFillInfo, data, index); + SInterval* pInterval = &pFillInfo->interval; + pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); pFillInfo->numOfCurrent++; } -static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { - if (*next != NULL) { +void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) { + if (pKey->isNull) { + colDataAppendNULL(pDstCol, rowIndex); + } else { + colDataAppend(pDstCol, rowIndex, pKey->pData, false); + } +} + +static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) { + if (taosArrayGetSize(pFillInfo->next) > 0) { return; } - *next = taosMemoryCalloc(1, pFillInfo->rowSize); - for (int i = 1; i < pFillInfo->numOfCols; i++) { + for (int i = 0; i < pFillInfo->numOfCols; i++) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes); + + SGroupKeys key = {0}; + SResSchema* pSchema = &pCol->pExpr->base.resSchema; + key.pData = taosMemoryMalloc(pSchema->bytes); + key.isNull = true; + key.bytes = pSchema->bytes; + key.type = pSchema->type; + + taosArrayPush(pFillInfo->next, &key); + + key.pData = taosMemoryMalloc(pSchema->bytes); + taosArrayPush(pFillInfo->prev, &key); } } -static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) { - int32_t rowIndex = pFillInfo->index; +static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull); + +static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); + int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]); + + SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); + + bool isNull = colDataIsNull_s(pSrcCol, rowIndex); + char* p = colDataGetData(pSrcCol, rowIndex); + saveColData(pRow, i, p, isNull); } } -static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) { +static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) { pFillInfo->numOfCurrent = 0; - char** srcData = pFillInfo->pData; - char** prev = &pFillInfo->prevValues; - char** next = &pFillInfo->nextValues; + // todo make sure the first column is always the primary timestamp column? + SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); + bool ascFill = FILL_IS_ASC_FILL(pFillInfo); - if (FILL_IS_ASC_FILL(pFillInfo)) { - assert(pFillInfo->currentKey >= pFillInfo->start); - } else { - assert(pFillInfo->currentKey <= pFillInfo->start); - } +#if 0 + ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start))); +#endif while (pFillInfo->numOfCurrent < outputRows) { - int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index]; + int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index]; // set the next value for interpolation - if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || - (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) { - initBeforeAfterDataBuf(pFillInfo, next); - copyCurrentRowIntoBuf(pFillInfo, srcData, *next); + if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) { + copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next); } - if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && - pFillInfo->numOfCurrent < outputRows) { - - // fill the gap between two actual input rows - while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || - (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && - pFillInfo->numOfCurrent < outputRows) { - doFillOneRowResult(pFillInfo, data, srcData, ts, false); + if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) { + // fill the gap between two input rows + while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) { + doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false); } // output buffer is full, abort @@ -208,61 +249,66 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR } } else { assert(pFillInfo->currentKey == ts); - initBeforeAfterDataBuf(pFillInfo, prev); + if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { - initBeforeAfterDataBuf(pFillInfo, next); ++pFillInfo->index; - copyCurrentRowIntoBuf(pFillInfo, srcData, *next); + copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next); --pFillInfo->index; } // assign rows to dst buffer for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) { continue; } - char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent); - char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index); + int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); + int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); - if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) || - (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) { - assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->offset, src, pCol->col.bytes); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId); + SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); + + char* src = colDataGetData(pSrc, pFillInfo->index); + if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*|| + (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) { + bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); + colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull); + saveColData(pFillInfo->prev, i, src, isNull); } else { // i > 0 and data is null , do interpolation if (pFillInfo->type == TSDB_FILL_PREV) { - assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type); + SGroupKeys *pKey = taosArrayGet(pFillInfo->prev, i); + doSetVal(pDst, pFillInfo->numOfCurrent, pKey); } else if (pFillInfo->type == TSDB_FILL_LINEAR) { - assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->offset, src, pCol->col.bytes); + bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); + colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull); + saveColData(pFillInfo->prev, i, src, isNull); + } else if (pFillInfo->type == TSDB_FILL_NULL) { + colDataAppendNULL(pDst, pFillInfo->numOfCurrent); } else if (pFillInfo->type == TSDB_FILL_NEXT) { - if (*next) { - assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type); - } else { - setNull(output, pCol->col.type, pCol->col.bytes); - } + SGroupKeys *pKey = taosArrayGet(pFillInfo->next, i); + doSetVal(pDst, pFillInfo->numOfCurrent, pKey); } else { - assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type); + SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; + colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false); } } } // set the tag value for final result - setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); +// setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); + SInterval *pInterval = &pFillInfo->interval; + pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); - pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, - pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); pFillInfo->index += 1; pFillInfo->numOfCurrent += 1; } if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) { /* the raw data block is exhausted, next value does not exists */ - if (pFillInfo->index >= pFillInfo->numOfRows) { - taosMemoryFreeClear(*next); - } - +// if (pFillInfo->index >= pFillInfo->numOfRows) { +// taosMemoryFreeClear(*next); +// } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return pFillInfo->numOfCurrent; } @@ -271,14 +317,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR return pFillInfo->numOfCurrent; } -static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) { +static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) { + SGroupKeys *pKey = taosArrayGet(rowBuf, columnIndex); + if (isNull) { + pKey->isNull = true; + } else { + memcpy(pKey->pData, src, pKey->bytes); + pKey->isNull = false; + } +} + +static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) { /* * These data are generated according to fill strategy, since the current timestamp is out of the time window of * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. */ pFillInfo->numOfCurrent = 0; while (pFillInfo->numOfCurrent < resultCapacity) { - doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true); + doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true); } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; @@ -295,15 +351,15 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; - pFillInfo->pData[i] = NULL; + SResSchema* pSchema = &pColInfo->pExpr->base.resSchema; - if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) { + if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) { numOfTags += 1; bool exists = false; int32_t index = -1; for (int32_t j = 0; j < k; ++j) { - if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) { + if (pFillInfo->pTags[j].col.colId == pSchema->slotId) { exists = true; index = j; break; @@ -311,12 +367,12 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t } if (!exists) { - SSchema* pSchema = &pFillInfo->pTags[k].col; - pSchema->colId = pColInfo->col.slotId; - pSchema->type = pColInfo->col.type; - pSchema->bytes = pColInfo->col.bytes; + SSchema* pSchema1 = &pFillInfo->pTags[k].col; + pSchema1->colId = pSchema->slotId; + pSchema1->type = pSchema->type; + pSchema1->bytes = pSchema->bytes; - pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pColInfo->col.bytes); + pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes); pColInfo->tagIndex = k; k += 1; @@ -325,7 +381,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t } } - rowsize += pColInfo->col.bytes; + rowsize += pSchema->bytes; } pFillInfo->numOfTags = numOfTags; @@ -355,7 +411,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag } taosResetFillInfo(pFillInfo, skey); - pFillInfo->order = order; switch(fillType) { @@ -364,6 +419,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break; case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break; case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break; + case FILL_MODE_VALUE: pFillInfo->type = TSDB_FILL_SET_VALUE; break; default: terrno = TSDB_CODE_INVALID_PARA; return NULL; @@ -376,7 +432,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag pFillInfo->alloc = capacity; pFillInfo->id = id; pFillInfo->interval = *pInterval; - pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols); // if (numOfTags > 0) { pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo)); @@ -385,6 +440,11 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag } // } + pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys)); + pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys)); + + initBeforeAfterDataBuf(pFillInfo); + pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); assert(pFillInfo->rowSize > 0); return pFillInfo; @@ -405,18 +465,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { return NULL; } - taosMemoryFreeClear(pFillInfo->prevValues); - taosMemoryFreeClear(pFillInfo->nextValues); + taosArrayDestroy(pFillInfo->prev); + taosArrayDestroy(pFillInfo->next); for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) { taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); } taosMemoryFreeClear(pFillInfo->pTags); - - taosMemoryFreeClear(pFillInfo->pData); taosMemoryFreeClear(pFillInfo->pFillCol); - taosMemoryFreeClear(pFillInfo); return NULL; } @@ -436,18 +493,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) } void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { - for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - - SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); - pFillInfo->pData[i] = pColData->pData; - - if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer - SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - assert (pTag->col.colId == pCol->col.slotId); - memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? - } - } + pFillInfo->pSrcBlock = (SSDataBlock*) pInput; } bool taosFillHasMoreResults(SFillInfo* pFillInfo) { @@ -465,8 +511,9 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) { } int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { - int64_t* tsList = (int64_t*) pFillInfo->pData[0]; + SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0); + int64_t* tsList = (int64_t*) pCol->pData; int32_t numOfRows = taosNumOfRemainRows(pFillInfo); TSKEY ekey1 = ekey; @@ -513,7 +560,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* return TSDB_CODE_SUCCESS; } -int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) { +int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity); @@ -521,9 +568,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t cap // no data existed for fill operation now, append result according to the fill strategy if (remain == 0) { - appendFilledResult(pFillInfo, output, numOfRes); + appendFilledResult(pFillInfo, p, numOfRes); } else { - fillResultImpl(pFillInfo, output, (int32_t) numOfRes); + fillResultImpl(pFillInfo, p, (int32_t) numOfRes); assert(numOfRes == pFillInfo->numOfCurrent); } @@ -538,28 +585,30 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) { return pFillInfo->start; } -struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) { - int32_t offset = 0; - - struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo)); +SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) { + SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo)); if (pFillCol == NULL) { return NULL; } + size_t len = (pValNode != NULL)? LIST_LENGTH(pValNode->pNodeList):0; for(int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = &pExpr[i]; + SExprInfo* pExprInfo = &pExpr[i]; + pFillCol[i].pExpr = pExprInfo; + pFillCol[i].tagIndex = -2; - pFillCol[i].col = pExprInfo->base.resSchema; - pFillCol[i].offset = offset; - pFillCol[i].tagIndex = -2; + // todo refactor + if (len > 0) { + // if the user specified value is less than the column, alway use the last one as the fill value + int32_t index = (i >= len)? (len - 1):i; + + SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); + valueNodeToVariant(pv, &pFillCol[i].fillVal); + } if (pExprInfo->base.numOfParams > 0) { pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query } -// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId; -// pFillCol[i].val.d = *val; - - offset += pExprInfo->base.resSchema.bytes; } return pFillCol; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index bb01af420da855be5c4d89cf48b1d31a58f800cd..0eba442e66f9f279055bf4ed1607038997c29632 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -78,6 +78,8 @@ typedef struct SDiffInfo { int64_t i64; double d64; } prev; + + int64_t prevTs; } SDiffInfo; typedef struct SSpreadInfo { @@ -1196,9 +1198,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { bool isFirstBlock = (pDiffInfo->hasPrev == false); int32_t numOfElems = 0; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); - // int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; TSKEY* tsList = (int64_t*)pInput->pPTS->pData; @@ -1206,44 +1205,86 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { switch (pInputCol->info.type) { case TSDB_DATA_TYPE_INT: { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { - int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems); - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - if (pDiffInfo->includeNull) { - colDataSetNull_f(pOutput->nullbitmap, pos); - if (tsList != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); + if (pCtx->order == TSDB_ORDER_ASC) { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems); + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + + numOfElems += 1; } + continue; + } - numOfElems += 1; + int32_t v = *(int32_t*)colDataGetData(pInputCol, i); + if (pDiffInfo->hasPrev) { + int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt32(pOutput, pos, &delta); + } + + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } } - continue; + + pDiffInfo->prev.i64 = v; + pDiffInfo->hasPrev = true; + numOfElems++; } + } else { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t v = *(int32_t*)colDataGetData(pInputCol, i); + int32_t pos = startOffset + numOfElems; + + // there is a row of previous data block to be handled in the first place. + if (pDiffInfo->hasPrev) { + int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt32(pOutput, pos, &delta); + } - int32_t v = *(int32_t*)colDataGetData(pInputCol, i); - if (pDiffInfo->hasPrev) { - int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { - colDataSetNull_f(pOutput->nullbitmap, pos); - } else { - colDataAppendInt32(pOutput, pos, &delta); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs); + } + pDiffInfo->hasPrev = false; } - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); + // it is not the last row of current block + if (i < pInput->numOfRows + pInput->startRowIndex - 1) { + int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1); + + int32_t delta = v - next; // direct previous may be null + colDataAppendInt32(pOutput, pos, &delta); + + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + } else { + pDiffInfo->prev.i64 = v; + if (pTsOutput != NULL) { + pDiffInfo->prevTs = tsList[i]; + } + pDiffInfo->hasPrev = true; } + numOfElems++; } - pDiffInfo->prev.i64 = v; - pDiffInfo->hasPrev = true; - numOfElems++; } break; } case TSDB_DATA_TYPE_BIGINT: { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } @@ -1378,7 +1419,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { } // initial value is not set yet - if (!pDiffInfo->hasPrev || numOfElems <= 0) { + if (numOfElems <= 0) { /* * 1. current block and blocks before are full of null * 2. current block may be null value @@ -1386,15 +1427,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { assert(pCtx->hasNull); return 0; } else { - // for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) { - // SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t]; - // if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) { - // aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx); - // } - // } - - int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems; - return forwardStep; + return (isFirstBlock) ? numOfElems - 1 : numOfElems; } } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index e1f9a9f6a4544a055acf960c37e9921099990b39..6c85d9dff58f9b2f220cd9eac58afe5dc33dd928 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1521,7 +1521,7 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { SFillPhysiNode* pNode = (SFillPhysiNode*)pObj; - int32_t code = jsonToPhysiWindowNode(pJson, pObj); + int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e84f387dbea4cb24ae79248a731bdd1f54ba6b15..bd7ee6321aa067357d36d6739ea252ce9e00fdc1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -945,7 +945,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); + code = TSDB_CODE_INVALID_MSG; + QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); }