未验证 提交 129819db 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12229 from taosdata/feature/3.0_liaohj

fix(query): fix invalid extraction of column id.
...@@ -101,8 +101,8 @@ int32_t create_topic() { ...@@ -101,8 +101,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
#ifndef TDENGINE_QUERYUTIL_H #ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H
#include "tcommon.h" #include <libs/function/function.h>
#include "tbuffer.h" #include "tbuffer.h"
#include "tcommon.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
...@@ -56,9 +57,9 @@ typedef struct SResultRow { ...@@ -56,9 +57,9 @@ typedef struct SResultRow {
bool endInterp; // the time window end timestamp has done the interpolation already. bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window uint32_t numOfRows; // number of rows of current time window
struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo
STimeWindow win; STimeWindow win;
char *key; // start key of current result row struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
// char *key; // start key of current result row
} SResultRow; } SResultRow;
typedef struct SResultRowPosition { typedef struct SResultRowPosition {
......
...@@ -711,7 +711,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -711,7 +711,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
#if 0 #if 0
......
...@@ -157,8 +157,6 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { ...@@ -157,8 +157,6 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
pResultRow->pageId = -1; pResultRow->pageId = -1;
pResultRow->offset = -1; pResultRow->offset = -1;
pResultRow->closed = false; pResultRow->closed = false;
taosMemoryFreeClear(pResultRow->key);
pResultRow->win = TSWINDOW_INITIALIZER; pResultRow->win = TSWINDOW_INITIALIZER;
} }
......
...@@ -388,6 +388,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -388,6 +388,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page // allocate a new buffer page
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (pResult == NULL) { if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult); initResultRow(pResult);
...@@ -1152,7 +1153,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1152,7 +1153,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->resDataInfo.interBufSize = env.calcMemSize; pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
// for simple column, the intermediate buffer needs to hold one element. // for simple column, the result buffer needs to hold at least one element.
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
} }
...@@ -1872,7 +1873,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) ...@@ -1872,7 +1873,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
} }
void initResultRow(SResultRow* pResultRow) { void initResultRow(SResultRow* pResultRow) {
pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); // pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
} }
/* /*
...@@ -1884,7 +1885,7 @@ void initResultRow(SResultRow* pResultRow) { ...@@ -1884,7 +1885,7 @@ void initResultRow(SResultRow* pResultRow) {
* offset[0] offset[1] offset[2] * offset[0] offset[1] offset[2]
*/ */
// TODO refactor: some function move away // TODO refactor: some function move away
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx; SqlFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes; SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
...@@ -1897,6 +1898,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t ...@@ -1897,6 +1898,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId, SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
pTaskInfo, false, pSup); pTaskInfo, false, pSup);
ASSERT(pDataBlock->info.numOfCols == numOfExprs);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset); struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
cleanupResultRowEntry(pEntry); cleanupResultRowEntry(pEntry);
...@@ -3624,7 +3626,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -3624,7 +3626,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error; goto _error;
} }
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo);
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo); code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -4217,12 +4219,22 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -4217,12 +4219,22 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/"); uint32_t defaultPgsz = 4096;
while(defaultPgsz < pAggSup->resultRowSize*4) {
defaultPgsz <<= 1u;
}
// at least four pages need to be in buffer
int32_t defaultBufsz = 4096 * 256;
if (defaultBufsz <= defaultPgsz) {
defaultBufsz = defaultPgsz * 4;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, "/tmp/");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4362,6 +4374,10 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4362,6 +4374,10 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(pInfo, numOfOutput); doDestroyBasicInfo(pInfo, numOfOutput);
} }
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param;
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
...@@ -4425,7 +4441,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -4425,7 +4441,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
...@@ -4938,7 +4954,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4938,7 +4954,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -5510,7 +5526,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { ...@@ -5510,7 +5526,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
...@@ -5536,7 +5552,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf ...@@ -5536,7 +5552,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l ...@@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
if (paraLen == 0 || paraLen > 2) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_NUMERIC_TYPE(p1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = p1->resType;
return TSDB_CODE_SUCCESS;
}
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
...@@ -617,7 +631,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -617,7 +631,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "diff", .name = "diff",
.type = FUNCTION_TYPE_DIFF, .type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateInOutNum, .translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = diffFunctionSetup,
.processFunc = diffFunction, .processFunc = diffFunction,
......
...@@ -75,7 +75,7 @@ typedef struct SPercentileInfo { ...@@ -75,7 +75,7 @@ typedef struct SPercentileInfo {
typedef struct SDiffInfo { typedef struct SDiffInfo {
bool hasPrev; bool hasPrev;
bool includeNull; bool includeNull;
bool ignoreNegative; bool ignoreNegative; // replace the ignore with case when
bool firstOutput; bool firstOutput;
union { union {
int64_t i64; int64_t i64;
...@@ -1419,248 +1419,192 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { ...@@ -1419,248 +1419,192 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->hasPrev = false; pDiffInfo->hasPrev = false;
pDiffInfo->prev.i64 = 0; pDiffInfo->prev.i64 = 0;
pDiffInfo->ignoreNegative = false; // TODO set correct param pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
pDiffInfo->includeNull = false; pDiffInfo->includeNull = false;
pDiffInfo->firstOutput = false; pDiffInfo->firstOutput = false;
return true; return true;
} }
int32_t diffFunction(SqlFunctionCtx* pCtx) { static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); switch(type) {
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
SInputColumnInfoData* pInput = &pCtx->input; pDiffInfo->prev.i64 = *(int8_t*) pv; break;
SColumnInfoData* pInputCol = pInput->pData[0]; case TSDB_DATA_TYPE_INT:
pDiffInfo->prev.i64 = *(int32_t*) pv; break;
bool isFirstBlock = (pDiffInfo->hasPrev == false); case TSDB_DATA_TYPE_SMALLINT:
int32_t numOfElems = 0; pDiffInfo->prev.i64 = *(int16_t*) pv; break;
case TSDB_DATA_TYPE_BIGINT:
SColumnInfoData* pTsOutput = pCtx->pTsOutput; pDiffInfo->prev.i64 = *(int64_t*) pv; break;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData; case TSDB_DATA_TYPE_FLOAT:
pDiffInfo->prev.d64 = *(float *) pv; break;
case TSDB_DATA_TYPE_DOUBLE:
pDiffInfo->prev.d64 = *(double*) pv; break;
default:
ASSERT(0);
}
}
int32_t startOffset = pCtx->offset; static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int32_t order) {
switch (pInputCol->info.type) { int32_t factor = (order == TSDB_ORDER_ASC)? 1:-1;
switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; int32_t v = *(int32_t*)pv;
if (pCtx->order == TSDB_ORDER_ASC) { int32_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (delta < 0 && pDiffInfo->ignoreNegative) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos); colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) { } else {
colDataAppendInt64(pTsOutput, pos, &tsList[i]); colDataAppendInt32(pOutput, pos, &delta);
}
numOfElems += 1;
} }
continue; pDiffInfo->prev.i64 = v;
break;
} }
case TSDB_DATA_TYPE_BOOL:
int32_t v = *(int32_t*)colDataGetData(pInputCol, i); case TSDB_DATA_TYPE_TINYINT: {
if (pDiffInfo->hasPrev) { int8_t v = *(int8_t*)pv;
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null int8_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) { if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos); colDataSetNull_f(pOutput->nullbitmap, pos);
} else { } else {
colDataAppendInt32(pOutput, pos, &delta); colDataAppendInt8(pOutput, pos, &delta);
} }
pDiffInfo->prev.i64 = v;
if (pTsOutput != NULL) { break;
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)pv;
int16_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt16(pOutput, pos, &delta);
} }
pDiffInfo->prev.i64 = v; pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true; break;
numOfElems++;
} }
} else { case TSDB_DATA_TYPE_BIGINT: {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { int64_t v = *(int64_t*)pv;
int32_t v = *(int32_t*)colDataGetData(pInputCol, i); int64_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
int32_t pos = startOffset + numOfElems;
// there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) { if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos); colDataSetNull_f(pOutput->nullbitmap, pos);
} else { } else {
colDataAppendInt32(pOutput, pos, &delta); colDataAppendInt64(pOutput, pos, &delta);
} }
pDiffInfo->prev.i64 = v;
if (pTsOutput != NULL) { break;
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
} }
pDiffInfo->hasPrev = false; case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)pv;
float delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendFloat(pOutput, pos, &delta);
} }
pDiffInfo->prev.d64 = v;
// it is not the last row of current block break;
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
int32_t delta = v - next; // direct previous may be null
colDataAppendInt32(pOutput, pos, &delta);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)pv;
double delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else { } else {
pDiffInfo->prev.i64 = v; colDataAppendDouble(pOutput, pos, &delta);
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
pDiffInfo->hasPrev = true;
} }
numOfElems++; pDiffInfo->prev.d64 = v;
break;
} }
default:
ASSERT(0);
} }
break;
} }
case TSDB_DATA_TYPE_BIGINT: { int32_t diffFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
int32_t v = 0; SInputColumnInfoData* pInput = &pCtx->input;
if (pDiffInfo->hasPrev) {
v = *(int64_t*)colDataGetData(pInputCol, i);
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (pDiffInfo->ignoreNegative) {
continue;
}
// *(pOutput++) = delta; SColumnInfoData* pInputCol = pInput->pData[0];
// *pTimestamp = (tsList != NULL)? tsList[i]:0; SColumnInfoData* pTsOutput = pCtx->pTsOutput;
//
// pOutput += 1;
// pTimestamp += 1;
}
pDiffInfo->prev.i64 = v; int32_t numOfElems = 0;
pDiffInfo->hasPrev = true; TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
numOfElems++; int32_t startOffset = pCtx->offset;
}
break;
}
#if 0
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet if (pCtx->order == TSDB_ORDER_ASC) {
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
*pTimestamp = (tsList != NULL)? tsList[i]:0; int32_t pos = startOffset + numOfElems;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->d64Prev = pData[i]; if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
pDiffInfo->hasPrev = true; if (pDiffInfo->includeNull) {
numOfElems++; colDataSetNull_f(pOutput->nullbitmap, pos);
} if (tsList != NULL) {
break; colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
float *pOutput = (float *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { numOfElems += 1;
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
} }
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue; continue;
} }
if (pDiffInfo->hasPrev) { // initial value is not set yet char* pv = colDataGetData(pInputCol, i);
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; if (pDiffInfo->hasPrev) {
pOutput += 1; doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
pTimestamp += 1; if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
pDiffInfo->d64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++; numOfElems++;
} else {
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
} }
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
int16_t *pOutput = (int16_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { pDiffInfo->hasPrev = true;
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
} }
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + numOfElems;
if (pDiffInfo->hasPrev) { // initial value is not set yet if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null if (pDiffInfo->includeNull) {
*pTimestamp = (tsList != NULL)? tsList[i]:0; colDataSetNull_f(pOutput->nullbitmap, pos);
pOutput += 1; if (tsList != NULL) {
pTimestamp += 1; colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
pDiffInfo->i64Prev = pData[i]; numOfElems += 1;
pDiffInfo->hasPrev = true;
numOfElems++;
} }
break; continue;
} }
case TSDB_DATA_TYPE_TINYINT: { char* pv = colDataGetData(pInputCol, i);
int8_t *pData = (int8_t *)data;
int8_t *pOutput = (int8_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { // there is a row of previous data block to be handled in the first place.
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { if (pDiffInfo->hasPrev) {
continue; doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
} if (pTsOutput != NULL) {
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
continue;
} }
if (pDiffInfo->hasPrev) { // initial value is not set yet numOfElems++;
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null } else {
*pTimestamp = (tsList != NULL)? tsList[i]:0; doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
pOutput += 1;
pTimestamp += 1;
} }
pDiffInfo->i64Prev = pData[i];
pDiffInfo->hasPrev = true; pDiffInfo->hasPrev = true;
numOfElems++; if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
} }
break;
} }
#endif
default:
break;
// qError("error input type");
} }
// initial value is not set yet // initial value is not set yet
if (numOfElems <= 0) { return numOfElems;
return 0;
} else {
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
}
} }
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
//#include "tfill.h"
#include "function.h" #include "function.h"
#include "taggfunction.h" #include "taggfunction.h"
#include "tbuffer.h" #include "tbuffer.h"
...@@ -27,7 +26,6 @@ ...@@ -27,7 +26,6 @@
#include "thistogram.h" #include "thistogram.h"
#include "tpercentile.h" #include "tpercentile.h"
#include "ttszip.h" #include "ttszip.h"
//#include "queryLog.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tudf.h" #include "tudf.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册