From bdc6409e7740e6a711688fb25d7530fd43509e08 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 Aug 2021 00:47:42 +0800 Subject: [PATCH] [TD-6046] fix ts derivative error --- src/client/src/tscSQLParser.c | 6 +--- src/query/inc/qAggMain.h | 1 - src/query/inc/qExecutor.h | 1 + src/query/src/qAggMain.c | 14 ---------- src/query/src/qExecutor.c | 52 +++++++++++++++++++++-------------- 5 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 62f1843ee5..781b1be76f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2872,7 +2872,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, + insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].name, pExpr); colIndex += 1; // the first column is ts @@ -7052,10 +7052,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) { continue; } - if(functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_DIFF){ // to avoid ts function id was modufied below - tagTsColExists = false; - } - if (functionId < 0) { SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 4b2de758ac..d4116fbfb2 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -186,7 +186,6 @@ typedef struct SQLFunctionCtx { tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param int64_t *ptsList; // corresponding timestamp array list void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ - void *ptsOriOutputBuf; SQLPreAggVal preAggVals; tVariant tag; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 56fab57e26..5b810e217e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -595,6 +595,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); +void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 89e91cb856..c19628eb37 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2789,7 +2789,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; TSKEY *pTimestamp = pCtx->ptsOutputBuf; - TSKEY *pTimestampOri = pCtx->ptsOriOutputBuf; TSKEY *tsList = GET_TS_LIST(pCtx); double *pOutput = (double *)pCtx->pOutput; @@ -2809,7 +2808,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; notNullElems++; @@ -2837,7 +2835,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; notNullElems++; @@ -2864,7 +2861,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; notNullElems++; @@ -2892,7 +2888,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; notNullElems++; @@ -2919,7 +2914,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; notNullElems++; @@ -2946,7 +2940,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; @@ -2989,7 +2982,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; TSKEY* pTimestamp = pCtx->ptsOutputBuf; - TSKEY* pTimestampOri = pCtx->ptsOriOutputBuf; TSKEY* tsList = GET_TS_LIST(pCtx); switch (pCtx->inputType) { @@ -3005,7 +2997,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } @@ -3028,7 +3019,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } @@ -3051,7 +3041,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } @@ -3074,7 +3063,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } @@ -3097,7 +3085,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } @@ -3121,7 +3108,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; - if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;} pOutput += 1; pTimestamp += 1; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index dfaf60d1d7..3e1fac8fd3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2024,7 +2024,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { int32_t f = pExpr[0].base.functionId; - assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY || f == TSDB_FUNC_PRJ); + assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); pCtx->param[2].i64 = pQueryAttr->order.order; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; @@ -3615,19 +3615,6 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf } } - char *tsbuf = NULL; - int16_t tsFuncIndex = -1; - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - - // find the ts output data pointer - int32_t functionId = pBInfo->pCtx[i].functionId; - if (functionId == TSDB_FUNC_PRJ && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - tsbuf = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; - tsFuncIndex = i; - break; - } - } for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3635,13 +3622,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf // re-estabilish output buffer pointer. int32_t functionId = pBInfo->pCtx[i].functionId; - if ((i > 0) && - (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { - pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; - pBInfo->pCtx[i].ptsOriOutputBuf = tsbuf; - if(tsFuncIndex != -1) { - pBInfo->pCtx[tsFuncIndex].functionId = TSDB_FUNC_TS_DUMMY; // to avoid query data - } + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE){ + if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; } } } @@ -3659,7 +3641,34 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) { } } +void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + bool needCopyTs = false; + int32_t tsNum = 0; + for (int32_t i = 0; i < numOfOutput; i++) { + int32_t functionId = pCtx[i].functionId; + if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { + needCopyTs = true; + }else if(functionId == TSDB_FUNC_TS_COMP) { + tsNum++; + } + } + char *src = NULL; + for (int32_t col = 0; col < numOfOutput; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + if (strlen(pColRes->pData) != 0) { + src = pColRes->pData; // find ts data + } + } + if (!needCopyTs) return; + if (tsNum < 2) return; + if (src == NULL) return; + + for (int32_t col = 0; col < numOfOutput; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows); + } +} void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { @@ -5635,6 +5644,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { break; } + copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); } clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); -- GitLab