diff --git a/example/src/tmq.c b/example/src/tmq.c index 2b73d8351f8bace368ebdb62b756f2371ef923ee..976d658fa6fc6557c11fde9e5c692fba801ab1f4 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -101,8 +101,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9f834fd659000e66eaa97208e9a0f30fc861ba9d..e62729a0517a60e42b04f88159f0da55bff58051 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -15,8 +15,9 @@ #ifndef TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H -#include "tcommon.h" +#include #include "tbuffer.h" +#include "tcommon.h" #include "tpagedbuf.h" #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ @@ -56,9 +57,9 @@ typedef struct SResultRow { bool endInterp; // the time window end timestamp has done the interpolation already. bool closed; // this result status: closed or opened uint32_t numOfRows; // number of rows of current time window - struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo STimeWindow win; - char *key; // start key of current result row + struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo +// char *key; // start key of current result row } SResultRow; typedef struct SResultRowPosition { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e35da0c45a8773cfe3f801575895cf26bd042dca..5cfe796a2992435179da65656b18abae9ebf7f92 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -711,7 +711,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); #if 0 diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 3283ae2b55326972a15bf6d34fcb036223117749..763dcef790014606cf9b661c3a8527e0c2a431ca 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -157,8 +157,6 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { pResultRow->pageId = -1; pResultRow->offset = -1; pResultRow->closed = false; - - taosMemoryFreeClear(pResultRow->key); pResultRow->win = TSWINDOW_INITIALIZER; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 04e9cf8cb209d061bcd7c063579e1229027d29a3..1327ddb48ece4816e817aae39c42a7e4c35ef2d7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -388,6 +388,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // allocate a new buffer page prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); if (pResult == NULL) { + ASSERT(pSup->resultRowSize > 0); pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); initResultRow(pResult); @@ -1152,7 +1153,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->resDataInfo.interBufSize = env.calcMemSize; } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { - // for simple column, the intermediate buffer needs to hold one element. + // for simple column, the result buffer needs to hold at least one element. pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; } @@ -1872,7 +1873,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) } void initResultRow(SResultRow* pResultRow) { - pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); +// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); } /* @@ -1884,7 +1885,7 @@ void initResultRow(SResultRow* pResultRow) { * offset[0] offset[1] offset[2] */ // TODO refactor: some function move away -void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { +void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -1897,6 +1898,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); + ASSERT(pDataBlock->info.numOfCols == numOfExprs); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset); cleanupResultRowEntry(pEntry); @@ -3624,7 +3626,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo); code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4217,12 +4219,22 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); - if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || - pAggSup->pResultRowHashTable == NULL) { + if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/"); + uint32_t defaultPgsz = 4096; + while(defaultPgsz < pAggSup->resultRowSize*4) { + defaultPgsz <<= 1u; + } + + // at least four pages need to be in buffer + int32_t defaultBufsz = 4096 * 256; + if (defaultBufsz <= defaultPgsz) { + defaultBufsz = defaultPgsz * 4; + } + + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, "/tmp/"); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4362,6 +4374,10 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(pInfo, numOfOutput); } +void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { + SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param; +} + void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -4425,7 +4441,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p initResultSizeInfo(pOperator, numOfRows); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); pOperator->name = "ProjectOperator"; @@ -4938,7 +4954,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); - pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); + pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -5510,7 +5526,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { return (pRes->info.rows > 0) ? pRes : NULL; } -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); @@ -5536,7 +5552,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0296a312d00378824a07f52213aa6bccc2441b7d..c1aee5ba351544fbdb32d415cd9fa8078303ebe6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -656,7 +656,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { SColMatchInfo* id = taosArrayGet(pColList, i); - int16_t colId = id->colId; + int16_t colId = id->colId; taosArrayPush(pColIds, &colId); } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 9e2bdea7d50ffee40d0d4efb04a51ba626427fd5..5aa1b63c7921b0dc0b9383fffba80140491a0439 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t paraLen = LIST_LENGTH(pFunc->pParameterList); + if (paraLen == 0 || paraLen > 2) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + if (!IS_NUMERIC_TYPE(p1->resType.type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = p1->resType; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -617,7 +631,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "diff", .type = FUNCTION_TYPE_DIFF, .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, - .translateFunc = translateInOutNum, + .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, .processFunc = diffFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1cb17ca493fae8be7e9e4490659df72fad695c70..9c1601b61a00c9e0c5956e1a49d6f880b808b3ef 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -75,7 +75,7 @@ typedef struct SPercentileInfo { typedef struct SDiffInfo { bool hasPrev; bool includeNull; - bool ignoreNegative; + bool ignoreNegative; // replace the ignore with case when bool firstOutput; union { int64_t i64; @@ -1419,248 +1419,192 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; pDiffInfo->prev.i64 = 0; - pDiffInfo->ignoreNegative = false; // TODO set correct param + pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param pDiffInfo->includeNull = false; pDiffInfo->firstOutput = false; return true; } +static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { + switch(type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + pDiffInfo->prev.i64 = *(int8_t*) pv; break; + case TSDB_DATA_TYPE_INT: + pDiffInfo->prev.i64 = *(int32_t*) pv; break; + case TSDB_DATA_TYPE_SMALLINT: + pDiffInfo->prev.i64 = *(int16_t*) pv; break; + case TSDB_DATA_TYPE_BIGINT: + pDiffInfo->prev.i64 = *(int64_t*) pv; break; + case TSDB_DATA_TYPE_FLOAT: + pDiffInfo->prev.d64 = *(float *) pv; break; + case TSDB_DATA_TYPE_DOUBLE: + pDiffInfo->prev.d64 = *(double*) pv; break; + default: + ASSERT(0); + } +} + +static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int32_t order) { + int32_t factor = (order == TSDB_ORDER_ASC)? 1:-1; + switch (type) { + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t*)pv; + int32_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt32(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t*)pv; + int8_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt8(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t*)pv; + int16_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt16(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t*)pv; + int64_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt64(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float*)pv; + float delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendFloat(pOutput, pos, &delta); + } + pDiffInfo->prev.d64 = v; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double*)pv; + double delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendDouble(pOutput, pos, &delta); + } + pDiffInfo->prev.d64 = v; + break; + } + default: + ASSERT(0); + } + } + int32_t diffFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - - bool isFirstBlock = (pDiffInfo->hasPrev == false); - int32_t numOfElems = 0; + SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pTsOutput = pCtx->pTsOutput; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + int32_t numOfElems = 0; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; int32_t startOffset = pCtx->offset; - switch (pInputCol->info.type) { - case TSDB_DATA_TYPE_INT: { - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - if (pCtx->order == TSDB_ORDER_ASC) { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems); - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - if (pDiffInfo->includeNull) { - colDataSetNull_f(pOutput->nullbitmap, pos); - if (tsList != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - - numOfElems += 1; - } - continue; - } - int32_t v = *(int32_t*)colDataGetData(pInputCol, i); - if (pDiffInfo->hasPrev) { - int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { - colDataSetNull_f(pOutput->nullbitmap, pos); - } else { - colDataAppendInt32(pOutput, pos, &delta); - } - - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - } + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - pDiffInfo->prev.i64 = v; - pDiffInfo->hasPrev = true; - numOfElems++; - } - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t v = *(int32_t*)colDataGetData(pInputCol, i); - int32_t pos = startOffset + numOfElems; - - // there is a row of previous data block to be handled in the first place. - if (pDiffInfo->hasPrev) { - int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { - colDataSetNull_f(pOutput->nullbitmap, pos); - } else { - colDataAppendInt32(pOutput, pos, &delta); - } + if (pCtx->order == TSDB_ORDER_ASC) { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs); - } - pDiffInfo->hasPrev = false; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); } - // it is not the last row of current block - if (i < pInput->numOfRows + pInput->startRowIndex - 1) { - int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1); - - int32_t delta = v - next; // direct previous may be null - colDataAppendInt32(pOutput, pos, &delta); - - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - } else { - pDiffInfo->prev.i64 = v; - if (pTsOutput != NULL) { - pDiffInfo->prevTs = tsList[i]; - } - pDiffInfo->hasPrev = true; - } - numOfElems++; + numOfElems += 1; } - + continue; } - break; - } - case TSDB_DATA_TYPE_BIGINT: { - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - continue; - } - - int32_t v = 0; - if (pDiffInfo->hasPrev) { - v = *(int64_t*)colDataGetData(pInputCol, i); - int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (pDiffInfo->ignoreNegative) { - continue; - } + char* pv = colDataGetData(pInputCol, i); - // *(pOutput++) = delta; - // *pTimestamp = (tsList != NULL)? tsList[i]:0; - // - // pOutput += 1; - // pTimestamp += 1; + if (pDiffInfo->hasPrev) { + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); } - pDiffInfo->prev.i64 = v; - pDiffInfo->hasPrev = true; numOfElems++; + } else { + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); } - break; - } -#if 0 - case TSDB_DATA_TYPE_DOUBLE: { - double *pData = (double *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } - - if (pDiffInfo->hasPrev) { // initial value is not set yet - SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; - } - - pDiffInfo->d64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; - } - break; + pDiffInfo->hasPrev = true; } - case TSDB_DATA_TYPE_FLOAT: { - float *pData = (float *)data; - float *pOutput = (float *)pCtx->pOutput; - - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } + } else { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; + + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; + numOfElems += 1; } - - pDiffInfo->d64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; + continue; } - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *pData = (int16_t *)data; - int16_t *pOutput = (int16_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } + char* pv = colDataGetData(pInputCol, i); - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; + // there is a row of previous data block to be handled in the first place. + if (pDiffInfo->hasPrev) { + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs); } - pDiffInfo->i64Prev = pData[i]; - pDiffInfo->hasPrev = true; numOfElems++; + } else { + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); } - break; - } - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *pData = (int8_t *)data; - int8_t *pOutput = (int8_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } - - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; - } - - pDiffInfo->i64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; + pDiffInfo->hasPrev = true; + if (pTsOutput != NULL) { + pDiffInfo->prevTs = tsList[i]; } - break; } -#endif - default: - break; - // qError("error input type"); } // initial value is not set yet - if (numOfElems <= 0) { - return 0; - } else { - return (isFirstBlock) ? numOfElems - 1 : numOfElems; - } + return numOfElems; } bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index c26342bfa878fb7fbb092c5e913964a4c96b8615..4d80b88a3a3ee72f09efdead1d9ee3a0fa6cb68b 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -19,7 +19,6 @@ #include "thash.h" #include "ttypes.h" -//#include "tfill.h" #include "function.h" #include "taggfunction.h" #include "tbuffer.h" @@ -27,7 +26,6 @@ #include "thistogram.h" #include "tpercentile.h" #include "ttszip.h" -//#include "queryLog.h" #include "tdatablock.h" #include "tudf.h"