提交 bdc6409e 编写于 作者: wmmhello's avatar wmmhello

[TD-6046]<fix> fix ts derivative error

上级 da702897
...@@ -2872,7 +2872,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2872,7 +2872,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS].name, pExpr); aAggs[TSDB_FUNC_TS].name, pExpr);
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
...@@ -7052,10 +7052,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) { ...@@ -7052,10 +7052,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue; continue;
} }
if(functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_DIFF){ // to avoid ts function id was modufied below
tagTsColExists = false;
}
if (functionId < 0) { if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
......
...@@ -186,7 +186,6 @@ typedef struct SQLFunctionCtx { ...@@ -186,7 +186,6 @@ typedef struct SQLFunctionCtx {
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
void *ptsOriOutputBuf;
SQLPreAggVal preAggVals; SQLPreAggVal preAggVals;
tVariant tag; tVariant tag;
......
...@@ -595,6 +595,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 ...@@ -595,6 +595,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(SQueryParam *param); void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
......
...@@ -2789,7 +2789,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2789,7 +2789,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY *pTimestamp = pCtx->ptsOutputBuf; TSKEY *pTimestamp = pCtx->ptsOutputBuf;
TSKEY *pTimestampOri = pCtx->ptsOriOutputBuf;
TSKEY *tsList = GET_TS_LIST(pCtx); TSKEY *tsList = GET_TS_LIST(pCtx);
double *pOutput = (double *)pCtx->pOutput; double *pOutput = (double *)pCtx->pOutput;
...@@ -2809,7 +2808,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2809,7 +2808,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
notNullElems++; notNullElems++;
...@@ -2837,7 +2835,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2837,7 +2835,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
notNullElems++; notNullElems++;
...@@ -2864,7 +2861,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2864,7 +2861,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
notNullElems++; notNullElems++;
...@@ -2892,7 +2888,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2892,7 +2888,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
notNullElems++; notNullElems++;
...@@ -2919,7 +2914,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2919,7 +2914,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
notNullElems++; notNullElems++;
...@@ -2946,7 +2940,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2946,7 +2940,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
if (pTimestampOri) {*pTimestampOri = tsList[i]; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
...@@ -2989,7 +2982,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -2989,7 +2982,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* pTimestampOri = pCtx->ptsOriOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY* tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) { switch (pCtx->inputType) {
...@@ -3005,7 +2997,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3005,7 +2997,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3028,7 +3019,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3028,7 +3019,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null *pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3051,7 +3041,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3051,7 +3041,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3074,7 +3063,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3074,7 +3063,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null *pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3097,7 +3085,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3097,7 +3085,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3121,7 +3108,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3121,7 +3108,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL)? tsList[i]:0;
if (pTimestampOri) {*pTimestampOri = *pTimestamp; pTimestampOri += 1;}
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
......
...@@ -2024,7 +2024,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -2024,7 +2024,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pExpr[0].base.functionId; int32_t f = pExpr[0].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY || f == TSDB_FUNC_PRJ); assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64 = pQueryAttr->order.order; pCtx->param[2].i64 = pQueryAttr->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
...@@ -3615,19 +3615,6 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3615,19 +3615,6 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
} }
} }
char *tsbuf = NULL;
int16_t tsFuncIndex = -1;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
// find the ts output data pointer
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_PRJ && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
tsbuf = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
tsFuncIndex = i;
break;
}
}
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
...@@ -3635,13 +3622,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3635,13 +3622,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
// re-estabilish output buffer pointer. // re-estabilish output buffer pointer.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
if ((i > 0) && if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE){
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
pBInfo->pCtx[i].ptsOriOutputBuf = tsbuf;
if(tsFuncIndex != -1) {
pBInfo->pCtx[tsFuncIndex].functionId = TSDB_FUNC_TS_DUMMY; // to avoid query data
}
} }
} }
} }
...@@ -3659,7 +3641,34 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) { ...@@ -3659,7 +3641,34 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) {
} }
} }
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
bool needCopyTs = false;
int32_t tsNum = 0;
for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
needCopyTs = true;
}else if(functionId == TSDB_FUNC_TS_COMP) {
tsNum++;
}
}
char *src = NULL;
for (int32_t col = 0; col < numOfOutput; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
if (strlen(pColRes->pData) != 0) {
src = pColRes->pData; // find ts data
}
}
if (!needCopyTs) return;
if (tsNum < 2) return;
if (src == NULL) return;
for (int32_t col = 0; col < numOfOutput; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows);
}
}
void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
...@@ -5635,6 +5644,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -5635,6 +5644,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) {
break; break;
} }
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
} }
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册