diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 380e37645d083682ecbcd5865dce934878317d5d..a804d18b63ae6971cca74e687cf4393d60e19e23 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -71,14 +71,13 @@ extern "C" { #define TSDB_FUNC_BLKINFO 33 #define TSDB_FUNC_CSUM 34 - -#define TSDB_FUNC_HLL 35 -#define TSDB_FUNC_MODE 36 -#define TSDB_FUNC_SAMPLE 37 +#define TSDB_FUNC_MAVG 35 +#define TSDB_FUNC_SAMPLE 36 +#define TSDB_FUNC_MODE 37 #define TSDB_FUNC_CEIL 38 #define TSDB_FUNC_FLOOR 39 #define TSDB_FUNC_ROUND 40 -#define TSDB_FUNC_MAVG 41 +#define TSDB_FUNC_HLL 41 #define TSDB_FUNC_HISTOGRAM 42 #define TSDB_FUNCSTATE_SO 0x1u // single output diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 926b65f64b52680f8039cec0387bb8aa3b0ce280..9a95e41336a74a4b07dec8765892bb9f377a5c9c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4413,7 +4413,7 @@ typedef struct { int32_t pos; double sum; int32_t numPoints; - int64_t* points; + double* points; } SMovingAvgInfo; static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { @@ -4422,10 +4422,10 @@ static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn } SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo); - mavgInfo->pos = -1; + mavgInfo->pos = 0; mavgInfo->sum = 0; mavgInfo->numPoints = (int32_t)pCtx->param[0].i64; - mavgInfo->points = (int64_t*)((char*)mavgInfo + sizeof(mavgInfo)); + mavgInfo->points = (double*)((char*)mavgInfo + sizeof(mavgInfo)); return true; } @@ -4445,96 +4445,169 @@ static void mavg_function(SQLFunctionCtx *pCtx) { switch (pCtx->inputType) { case TSDB_DATA_TYPE_INT: { int32_t *pData = (int32_t *)data; - int32_t *pOutput = (int32_t *)pCtx->pOutput; + double *pOutput = (double *)pCtx->pOutput; for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints -1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } break; } case TSDB_DATA_TYPE_BIGINT: { int64_t *pData = (int64_t *)data; - int64_t *pOutput = (int64_t *)pCtx->pOutput; + double *pOutput = (double *)pCtx->pOutput; for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints -1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } break; } case TSDB_DATA_TYPE_TINYINT: { int8_t *pData = (int8_t *)data; - int8_t *pOutput = (int8_t *)pCtx->pOutput; + double *pOutput = (double *)pCtx->pOutput; for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints -1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } + break; } case TSDB_DATA_TYPE_SMALLINT: { int16_t *pData = (int16_t *)data; - int16_t *pOutput = (int16_t *)pCtx->pOutput; + double *pOutput = (double *)pCtx->pOutput; for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints -1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } + break; } case TSDB_DATA_TYPE_FLOAT: { float *pData = (float *)data; - float *pOutput = (float *)pCtx->pOutput; + double *pOutput = (double *)pCtx->pOutput; for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints -1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } + break; } @@ -4546,16 +4619,31 @@ static void mavg_function(SQLFunctionCtx *pCtx) { continue; } - pCumSumInfo->cumSum += pData[i]; - *pTimestamp = (tsList != NULL) ? tsList[i] : 0; - SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); + if (mavgInfo->pos < mavgInfo->numPoints - 1) { + mavgInfo->points[mavgInfo->pos] = (double)pData[i]; + mavgInfo->sum += pData[i]; + } else { + int32_t pos = mavgInfo->pos % mavgInfo->numPoints; + if (mavgInfo->pos != mavgInfo->numPoints - 1) { + mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; + } else { + mavgInfo->sum += (double)pData[i]; + } - ++notNullElems; - pOutput += 1; - pTimestamp += 1; + mavgInfo->points[pos] = pData[i]; + + *pTimestamp = (tsList != NULL) ? tsList[i] : 0; + SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) + ++notNullElems; + pOutput += 1; + pTimestamp += 1; + } + + ++mavgInfo->pos; } break; } + default: qError("error input type"); } @@ -4568,7 +4656,179 @@ static void mavg_function(SQLFunctionCtx *pCtx) { } } +////////////////////////////////////////////////////////////////////////////////// +typedef struct { + int32_t notNullElems; + int32_t num; + tValuePair **res; +} SSampleFuncInfo; + +static void sampleValuePairAssign(tValuePair *dst, tVariant* srcVariant, int64_t tsKey, char *pTags, SExtTagsInfo *pTagInfo) { + dst->timestamp = tsKey; + tVariantAssign(&dst->v, srcVariant); + + int32_t size = 0; + + for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { + SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; + if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { + ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + ctx->tag.i64 = tsKey; + } + + tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); + size += pTagInfo->pTagCtxList[i]->outputBytes; + } + +} + + +static void do_sample_function_add(SSampleFuncInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, SExtTagsInfo *pTagInfo, char *pTags) { + tVariant val = {0}; + tVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); + + tValuePair **pList = pInfo->res; + assert(pList != NULL); + + pInfo->notNullElems++; + if (pInfo->num < maxLen) { + sampleValuePairAssign(pList[pInfo->num], &val, ts, pTags, pTagInfo); + pInfo->num++; + } else { + int32_t j = rand() % (pInfo->notNullElems); + if (j < maxLen) { + sampleValuePairAssign(pList[j], &val, ts, pTags, pTagInfo); + } + } +} + + +static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + tValuePair **tvp = pRes->res; + + int32_t step = QUERY_ASC_FORWARD_STEP; + int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); + + for (int32_t i = 0; i < len; ++i) { + char* output = pCtx->pOutput; + tVariantDump(&tvp[i]->v, (char*)output, type, true); + output += step * (pCtx->outputBytes); + } + + // set the output timestamp of each record. + TSKEY *output = pCtx->ptsOutputBuf; + for (int32_t i = 0; i < len; ++i, output += step) { + *output = tvp[i]->timestamp; + } + // set the corresponding tag data for each record + // todo check malloc failure + char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); + for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { + pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; + } + + for (int32_t i = 0; i < len; ++i, output += step) { + int16_t offset = 0; + for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { + memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); + offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; + pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes; + } + } + + tfree(pData); +} + +/* + * +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+ + * |-------------pointer area----------|----ts---+-----+-----n tags-----------|----ts---+-----+-----n tags-----------| + * +..[Value Pointer1][Value Pointer2].|timestamp|value|tags1|tags2|....|tagsn|timestamp|value|tags1|tags2|....|tagsn+ + */ +static void buildSampleFuncStruct(SSampleFuncInfo *pSampleFuncInfo, SQLFunctionCtx *pCtx) { + char *tmp = (char *)pSampleFuncInfo + sizeof(SSampleFuncInfo); + pSampleFuncInfo->res = (tValuePair**) tmp; + tmp += POINTER_BYTES * pCtx->param[0].i64; + + size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; + + for (int32_t i = 0; i < pCtx->param[0].i64; ++i) { + pSampleFuncInfo->res[i] = (tValuePair*) tmp; + pSampleFuncInfo->res[i]->pTags = tmp + sizeof(tValuePair); + tmp += size; + } +} + + +static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + + SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + buildSampleFuncStruct(pRes, pCtx); + srand(taosSafeRand()); + pRes->notNullElems = 0; + return true; +} + +static void sample_function(SQLFunctionCtx *pCtx) { + int32_t notNullElems = 0; + + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + assert(pRes->num >= 0); + + if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(SSampleFuncInfo) + POINTER_BYTES * pCtx->param[0].i64)) { + buildSampleFuncStruct(pRes, pCtx); + } + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { + continue; + } + + notNullElems++; + + // NOTE: Set the default timestamp if it is missing [todo refactor] + TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; + do_sample_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL); + } + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + // treat the result as only one result + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pResInfo->hasResult = DATA_SET_FLAG; + } +} + + +static void sample_func_finalizer(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + if (pRes->num == 0) { // no result + assert(pResInfo->hasResult != DATA_SET_FLAG); + } + + + GET_TRUE_DATA_TYPE(); + copySampleFuncRes(pCtx, type); + + doFinalizer(pCtx); +} + +////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////// /* @@ -5015,4 +5275,28 @@ SAggFunctionInfo aAggs[] = {{ noop1, dataBlockRequired, }, + { + // 35 + "mavg", + TSDB_FUNC_MAVG, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + mavg_function_setup, + mavg_function, + doFinalizer, + noop1, + dataBlockRequired, + }, + { + // 36 + "sample", + TSDB_FUNC_SAMPLE, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + sample_function_setup, + sample_function, + sample_func_finalizer, + noop1, + dataBlockRequired, + }, };