From fbeb0098ecbe46064d3fd14c11c74a4718616130 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 18 Sep 2021 14:53:33 +0800 Subject: [PATCH] finish the csum/mavg/sample functions implementation --- src/client/src/tscSQLParser.c | 19 +- src/query/src/qAggMain.c | 569 +++++++++------------------------- src/query/src/qExecutor.c | 14 +- 3 files changed, 167 insertions(+), 435 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index a62a8ac3ef..fbe9172329 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2499,6 +2499,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_MAX: case TSDB_FUNC_DIFF: case TSDB_FUNC_DERIVATIVE: + case TSDB_FUNC_CSUM: case TSDB_FUNC_CEIL: case TSDB_FUNC_FLOOR: case TSDB_FUNC_ROUND: @@ -2551,7 +2552,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // set the first column ts for diff query - if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { + if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM) { SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false); @@ -2591,7 +2592,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MICRO); } else if (info.precision == TSDB_TIME_PRECISION_MICRO) { tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI); - } + } if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10); @@ -2747,6 +2748,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_TOP: case TSDB_FUNC_BOTTOM: + case TSDB_FUNC_MAVG: + case TSDB_FUNC_SAMPLE: case TSDB_FUNC_PERCT: case TSDB_FUNC_APERCT: { // 1. valid the number of parameters @@ -2778,7 +2781,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // 2. valid the column type - if (!IS_NUMERIC_TYPE(pSchema->type)) { + if (functionId != TSDB_FUNC_SAMPLE && !IS_NUMERIC_TYPE(pSchema->type)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -2820,13 +2823,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); - int64_t nTop = GET_INT32_VAL(val); - if (nTop <= 0 || nTop > 100) { // todo use macro + int64_t numRowsSelected = GET_INT32_VAL(val); + if (numRowsSelected <= 0 || numRowsSelected > 100) { // todo use macro return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); } // todo REFACTOR - // set the first column ts for top/bottom query + // set the first column ts for top/bottom/mavg/sample query SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd), 0, false); @@ -6284,7 +6287,9 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu } int32_t f = pExpr->base.functionId; - if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ARITHM || f == TSDB_FUNC_DERIVATIVE || + if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) || + f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ARITHM || f == TSDB_FUNC_DERIVATIVE || + f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_CEIL || f == TSDB_FUNC_FLOOR || f == TSDB_FUNC_ROUND) { isProjectionFunction = true; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 834257c8e2..e4ef74a74d 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -169,6 +169,24 @@ typedef struct SDerivInfo { bool valueSet; // the value has been set already } SDerivInfo; +typedef struct { + double cumSum; +} SCumSumInfo; + +typedef struct { + int32_t pos; + double sum; + int32_t numPointsK; + double* points; +} SMovingAvgInfo; + +typedef struct { + int32_t totalPoints; + int32_t numSampled; + tVariant *values; + int64_t *timeStamps; +} SSampleFuncInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -237,6 +255,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } + if (functionId == TSDB_FUNC_CSUM) { + if (IS_SIGNED_NUMERIC_TYPE(dataType)) { + *type = TSDB_DATA_TYPE_BIGINT; + } else if (IS_UNSIGNED_NUMERIC_TYPE(dataType)) { + *type = TSDB_DATA_TYPE_UBIGINT; + } else { + *type = TSDB_DATA_TYPE_DOUBLE; + } + + *bytes = sizeof(int64_t); + *interBytes = sizeof(SCumSumInfo); + return TSDB_CODE_SUCCESS; + } + + if (functionId == TSDB_FUNC_MAVG) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(double); + *interBytes = sizeof(SMovingAvgInfo) + sizeof(double) * param; + return TSDB_CODE_SUCCESS; + } + + if (functionId == TSDB_FUNC_SAMPLE) { + *type = (int16_t)dataType; + *bytes = (int16_t)dataBytes; + size_t size = sizeof(SSampleFuncInfo) + sizeof(tVariant) * param + sizeof(int64_t) * param; + *interBytes = (int32_t)size; + return TSDB_CODE_SUCCESS; + } + if (isSuperTable) { if (functionId < 0) { if (pUdfInfo->bufSize > 0) { @@ -4085,6 +4132,8 @@ static void irate_function(SQLFunctionCtx *pCtx) { } } +///////////////////////////////////////////////////////////////////////////////////////////////////////////// + void blockInfo_func(SQLFunctionCtx* pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); @@ -4258,6 +4307,8 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { doFinalizer(pCtx); } +///////////////////////////////////////////////////////////////////////////////////////////////////////////// + #define CFR_SET_VAL(type, data, pCtx, func, i, step, notNullElems) \ do { \ type *pData = (type *) data; \ @@ -4484,10 +4535,7 @@ static void round_function(SQLFunctionCtx *pCtx) { #undef CFR_SET_VAL_DOUBLE ////////////////////////////////////////////////////////////////////////////////// - -typedef struct { - double cumSum; -} SCumSumInfo; +//cumulative_sum function static bool csum_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { @@ -4503,8 +4551,6 @@ static void csum_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SCumSumInfo* pCumSumInfo = GET_ROWCELL_INTERBUF(pResInfo); - void* data = GET_INPUT_DATA_LIST(pCtx); - int32_t notNullElems = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1; @@ -4512,125 +4558,37 @@ static void csum_function(SQLFunctionCtx *pCtx) { TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* tsList = GET_TS_LIST(pCtx); - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - int32_t *pData = (int32_t *)data; - int32_t *pOutput = (int32_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); - - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; - } - - case TSDB_DATA_TYPE_BIGINT: { - int64_t *pData = (int64_t *)data; - int64_t *pOutput = (int64_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); - - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; - } - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *pData = (int8_t *)data; - int8_t *pOutput = (int8_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); - - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; - } - - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *pData = (int16_t *)data; - int16_t *pOutput = (int16_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + qDebug("%p csum_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; + for (; i < pCtx->size && i >= 0; i += step) { + char* pData = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + qDebug("%p csum_function() index of null data:%d", pCtx, i); + continue; } - case TSDB_DATA_TYPE_FLOAT: { - float *pData = (float *)data; - float *pOutput = (float *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + double v = 0; + GET_TYPED_DATA(v, double, pCtx->inputType, pData); + pCumSumInfo->cumSum += v; - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { + int64_t *retVal = (int64_t *)pCtx->pOutput; + *retVal = (int64_t)(pCumSumInfo->cumSum); + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + uint64_t *retVal = (uint64_t *)pCtx->pOutput; + *retVal = (uint64_t)(pCumSumInfo->cumSum); + } else if (IS_FLOAT_TYPE(pCtx->inputType)) { + double *retVal = (double*) pCtx->pOutput; + SET_DOUBLE_VAL(retVal, pCumSumInfo->cumSum); } - case TSDB_DATA_TYPE_DOUBLE: { - double *pData = (double *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); - - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - break; - } - default: - qError("error input type"); + ++notNullElems; + pCtx->pOutput += pCtx->outputBytes; + pTimestamp++; } - if (notNullElems <= 0) { + if (notNullElems == 0) { assert(pCtx->hasNull); } else { GET_RES_INFO(pCtx)->numOfRes += notNullElems; @@ -4639,14 +4597,7 @@ static void csum_function(SQLFunctionCtx *pCtx) { } ////////////////////////////////////////////////////////////////////////////////// - - -typedef struct { - int32_t pos; - double sum; - int32_t numPoints; - double* points; -} SMovingAvgInfo; +// Simple Moving_average function static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { @@ -4656,7 +4607,7 @@ static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo); mavgInfo->pos = 0; mavgInfo->sum = 0; - mavgInfo->numPoints = (int32_t)pCtx->param[0].i64; + mavgInfo->numPointsK = (int32_t)pCtx->param[0].i64; mavgInfo->points = (double*)((char*)mavgInfo + sizeof(mavgInfo)); return true; } @@ -4665,8 +4616,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo); - void* data = GET_INPUT_DATA_LIST(pCtx); - int32_t notNullElems = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1; @@ -4674,210 +4623,38 @@ static void mavg_function(SQLFunctionCtx *pCtx) { TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* tsList = GET_TS_LIST(pCtx); - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - int32_t *pData = (int32_t *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints -1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; - - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - - ++mavgInfo->pos; - } - break; - } - - case TSDB_DATA_TYPE_BIGINT: { - int64_t *pData = (int64_t *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints -1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; - - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - - ++mavgInfo->pos; - } - break; - } - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *pData = (int8_t *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints -1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; - - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - - ++mavgInfo->pos; - } - - break; - } - - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *pData = (int16_t *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints -1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; - - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } - - ++mavgInfo->pos; - } - - break; + for (; i < pCtx->size && i >= 0; i += step) { + char* pData = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + qDebug("%p mavg_function() index of null data:%d", pCtx, i); + continue; } - case TSDB_DATA_TYPE_FLOAT: { - float *pData = (float *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints -1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; - - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } + double v = 0; + GET_TYPED_DATA(v, double, pCtx->inputType, pData); - ++mavgInfo->pos; + if (mavgInfo->pos < mavgInfo->numPointsK - 1) { + mavgInfo->points[mavgInfo->pos] = v; + mavgInfo->sum += v; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPointsK; + if (mavgInfo->pos != mavgInfo->numPointsK -1) { + mavgInfo->sum = mavgInfo->sum + v - mavgInfo->points[pos]; + } else { + mavgInfo->sum += v; } - break; - } - - case TSDB_DATA_TYPE_DOUBLE: { - double *pData = (double *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - - if (mavgInfo->pos < mavgInfo->numPoints - 1) { - mavgInfo->points[mavgInfo->pos] = (double)pData[i]; - mavgInfo->sum += pData[i]; - } else { - int32_t pos = mavgInfo->pos % mavgInfo->numPoints; - if (mavgInfo->pos != mavgInfo->numPoints - 1) { - mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; - } else { - mavgInfo->sum += (double)pData[i]; - } - - mavgInfo->points[pos] = pData[i]; + mavgInfo->points[pos] = v; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) - ++notNullElems; - pOutput += 1; - pTimestamp += 1; - } + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pCtx->pOutput, mavgInfo->sum / mavgInfo->numPointsK) - ++mavgInfo->pos; - } - break; + ++notNullElems; + pCtx->pOutput += pCtx->outputBytes; + pCtx->ptsOutputBuf++; } - default: - qError("error input type"); + ++mavgInfo->pos; } if (notNullElems <= 0) { @@ -4888,123 +4665,64 @@ static void mavg_function(SQLFunctionCtx *pCtx) { } } - ////////////////////////////////////////////////////////////////////////////////// -typedef struct { - int32_t notNullElems; - int32_t num; - tValuePair **res; -} SSampleFuncInfo; - -static void sampleValuePairAssign(tValuePair *dst, tVariant* srcVariant, int64_t tsKey, char *pTags, SExtTagsInfo *pTagInfo) { - dst->timestamp = tsKey; - tVariantAssign(&dst->v, srcVariant); - - int32_t size = 0; +// Sample function with reservoir sampling algorithm - for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { - SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; - if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { - ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - ctx->tag.i64 = tsKey; - } - - tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); - size += pTagInfo->pTagCtxList[i]->outputBytes; +static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + int32_t maxLen = bytes - VARSTR_HEADER_SIZE; + int32_t len = (varDataLen(pData) > maxLen)? maxLen:varDataLen(pData); + tVariantCreateFromBinary(pInfo->values + index, varDataVal(pData), len, type); + } else { + tVariantCreateFromBinary(pInfo->values + index, pData, bytes, type); } + *(pInfo->timeStamps + pInfo->numSampled) = ts; + return; } - -static void do_sample_function_add(SSampleFuncInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, SExtTagsInfo *pTagInfo, char *pTags) { - tVariant val = {0}; - tVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); - - tValuePair **pList = pInfo->res; - assert(pList != NULL); - - pInfo->notNullElems++; - if (pInfo->num < maxLen) { - sampleValuePairAssign(pList[pInfo->num], &val, ts, pTags, pTagInfo); - pInfo->num++; +static void do_reservoir_sample(SSampleFuncInfo *pInfo, int32_t samplesK, int64_t ts, void *pData, uint16_t type, int16_t bytes) { + pInfo->totalPoints++; + if (pInfo->numSampled < samplesK) { + assignResultSample(pInfo, pInfo->numSampled, ts, pData, type, bytes); + pInfo->numSampled++; } else { - int32_t j = rand() % (pInfo->notNullElems); - if (j < maxLen) { - sampleValuePairAssign(pList[j], &val, ts, pTags, pTagInfo); + int32_t j = rand() % (pInfo->totalPoints); + if (j < samplesK) { + assignResultSample(pInfo, j, ts, pData, bytes, type); + *(pInfo->timeStamps + j) = ts; } } } - static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - tValuePair **tvp = pRes->res; - - int32_t step = QUERY_ASC_FORWARD_STEP; - int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); - - for (int32_t i = 0; i < len; ++i) { + TSKEY* pTimestamp = pCtx->ptsOutputBuf; + for (int32_t i = 0; i < pRes->numSampled; ++i) { char* output = pCtx->pOutput; - tVariantDump(&tvp[i]->v, (char*)output, type, true); - output += step * (pCtx->outputBytes); - } - - // set the output timestamp of each record. - TSKEY *output = pCtx->ptsOutputBuf; - for (int32_t i = 0; i < len; ++i, output += step) { - *output = tvp[i]->timestamp; - } + tVariantDump(pRes->values + i, (char*)output, type, true); + *pTimestamp = *(pRes->timeStamps + i); - // set the corresponding tag data for each record - // todo check malloc failure - char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); - for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { - pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; - } - - for (int32_t i = 0; i < len; ++i, output += step) { - int16_t offset = 0; - for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { - memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); - offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; - pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes; - } + pCtx->pOutput += pCtx->outputBytes; + pTimestamp++; } - tfree(pData); -} - -/* - * +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+ - * |-------------pointer area----------|----ts---+-----+-----n tags-----------|----ts---+-----+-----n tags-----------| - * +..[Value Pointer1][Value Pointer2].|timestamp|value|tags1|tags2|....|tagsn|timestamp|value|tags1|tags2|....|tagsn+ - */ -static void buildSampleFuncStruct(SSampleFuncInfo *pSampleFuncInfo, SQLFunctionCtx *pCtx) { - char *tmp = (char *)pSampleFuncInfo + sizeof(SSampleFuncInfo); - pSampleFuncInfo->res = (tValuePair**) tmp; - tmp += POINTER_BYTES * pCtx->param[0].i64; - - size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - - for (int32_t i = 0; i < pCtx->param[0].i64; ++i) { - pSampleFuncInfo->res[i] = (tValuePair*) tmp; - pSampleFuncInfo->res[i]->pTags = tmp + sizeof(tValuePair); - tmp += size; - } } - static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { return false; } - SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - buildSampleFuncStruct(pRes, pCtx); srand(taosSafeRand()); - pRes->notNullElems = 0; + + SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + pRes->totalPoints = 0; + pRes->numSampled = 0; + pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo)); + pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64); return true; } @@ -5014,10 +4732,9 @@ static void sample_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - assert(pRes->num >= 0); - - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(SSampleFuncInfo) + POINTER_BYTES * pCtx->param[0].i64)) { - buildSampleFuncStruct(pRes, pCtx); + if (pRes->values != (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo))) { + pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo)); + pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64); } for (int32_t i = 0; i < pCtx->size; ++i) { @@ -5028,9 +4745,8 @@ static void sample_function(SQLFunctionCtx *pCtx) { notNullElems++; - // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; - do_sample_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL); + do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pCtx->inputBytes); } if (!pCtx->hasNull) { @@ -5050,14 +4766,17 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - if (pRes->num == 0) { // no result + if (pRes->numSampled == 0) { // no result assert(pResInfo->hasResult != DATA_SET_FLAG); } - GET_TRUE_DATA_TYPE(); copySampleFuncRes(pCtx, type); + for (int32_t i = 0; i < pRes->numSampled; ++i) { + tVariantDestroy(pRes->values + i); + } + doFinalizer(pCtx); } @@ -5079,8 +4798,10 @@ int32_t functionCompatList[] = { 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, // tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, - // tid_tag, deriv, ceil, floor, round, csum, mavg, sample, block_info - 6, 8, 1, 1, 1, -1, -1, -1, 7 + // tid_tag, deriv, ceil, floor, round, csum, mavg, sample, + 6, 8, 1, 1, 1, -1, -1, -1, + // block_info + 7 }; SAggFunctionInfo aAggs[] = {{ @@ -5544,7 +5265,7 @@ SAggFunctionInfo aAggs[] = {{ "sample", TSDB_FUNC_SAMPLE, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, sample_function_setup, sample_function, sample_func_finalizer, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 982996d70d..1eb353ad46 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2060,7 +2060,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - int32_t f = pExpr[0].base.functionId; + int32_t f = pExpr[i-1].base.functionId; assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); pCtx->param[2].i64 = pQueryAttr->order.order; @@ -3653,7 +3653,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i // set the timestamp output buffer for top/bottom/diff query int32_t fid = pCtx[i].functionId; - if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) { + if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE || + fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM) { if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } } @@ -3690,7 +3691,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || + functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || + functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || + functionId == TSDB_FUNC_SAMPLE ) { if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; } } @@ -3702,7 +3706,9 @@ void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) char *src = NULL; for (int32_t i = 0; i < numOfOutput; i++) { int32_t functionId = pCtx[i].functionId; - if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { + if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || + functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_CSUM || + functionId == TSDB_FUNC_SAMPLE) { needCopyTs = true; if (i > 0 && pCtx[i-1].functionId == TSDB_FUNC_TS_DUMMY){ SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data -- GitLab