diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 6acbfe3e8929c9a5a46ed0370f6cfb883988ef3e..14e426ee69f1b11fe09ef23d66190c75a2628e10 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -648,7 +648,8 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD for(int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); - if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) { + if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || + pCtx[j].functionId == TSDB_FUNC_SAMPLE) { if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 24b59d205f36bbdbb53d36dc9d8e47a67589acbf..c90f6062b7c274f26de64fcb8bf9681d0a9ee882 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3343,7 +3343,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) || - (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE)) { + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) || + (functionId == TSDB_FUNC_SAMPLE)) { if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index de0ed97db8dbb87db159aeb5dbc6d52885b7ac59..e3e5ccbce9f5c49871a7f0f5378fc56845e1b410 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -74,9 +74,7 @@ extern "C" { #define TSDB_FUNC_ROUND 35 #define TSDB_FUNC_CSUM 36 - #define TSDB_FUNC_MAVG 37 - #define TSDB_FUNC_SAMPLE 38 #define TSDB_FUNC_BLKINFO 39 diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 32c8630ff2ff04bb6fe453aebf49344adea6fc7a..f8fd18d90a38b18f49e6822e9c1c666a2553c4fa 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -183,7 +183,8 @@ typedef struct { typedef struct { int32_t totalPoints; int32_t numSampled; - tVariant *values; + int16_t colBytes; + char *values; int64_t *timeStamps; } SSampleFuncInfo; @@ -276,14 +277,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } - if (functionId == TSDB_FUNC_SAMPLE) { - *type = (int16_t)dataType; - *bytes = (int16_t)dataBytes; - size_t size = sizeof(SSampleFuncInfo) + sizeof(tVariant) * param + sizeof(int64_t) * param; - *interBytes = (int32_t)size; - return TSDB_CODE_SUCCESS; - } - if (isSuperTable) { if (functionId < 0) { if (pUdfInfo->bufSize > 0) { @@ -328,6 +321,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *interBytes = *bytes; return TSDB_CODE_SUCCESS; + } else if (functionId == TSDB_FUNC_SAMPLE) { + *type = TSDB_DATA_TYPE_BINARY; + *bytes = (int16_t)(sizeof(SSampleFuncInfo) + sizeof(dataBytes) * param + sizeof(int64_t) * param); + *interBytes = *bytes; } else if (functionId == TSDB_FUNC_SPREAD) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SSpreadInfo); @@ -436,6 +433,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI // the output column may be larger than sizeof(STopBotInfo) *interBytes = (int32_t)size; + } else if (functionId == TSDB_FUNC_SAMPLE) { + *type = (int16_t)dataType; + *bytes = (int16_t)dataBytes; + size_t size = sizeof(SSampleFuncInfo) + sizeof(dataBytes) * param + sizeof(int64_t) * param; + *interBytes = (int32_t)size; } else if (functionId == TSDB_FUNC_LAST_ROW) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; @@ -4669,15 +4671,19 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ////////////////////////////////////////////////////////////////////////////////// // Sample function with reservoir sampling algorithm -static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) { - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - int32_t maxLen = bytes - VARSTR_HEADER_SIZE; - int32_t len = (varDataLen(pData) > maxLen)? maxLen:varDataLen(pData); - tVariantCreateFromBinary(pInfo->values + index, varDataVal(pData), len, type); - } else { - tVariantCreateFromBinary(pInfo->values + index, pData, bytes, type); +static SSampleFuncInfo* getSampleFuncOutputInfo(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + // only the first_stage stable is directly written data into final output buffer + if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + return (SSampleFuncInfo *) pCtx->pOutput; + } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer + return GET_ROWCELL_INTERBUF(pResInfo); } +} +static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) { + assignVal(pInfo->values + index*bytes, pData, bytes, type); *(pInfo->timeStamps + index) = ts; return; } @@ -4702,8 +4708,7 @@ static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { TSKEY* pTimestamp = pCtx->ptsOutputBuf; char* pOutput = pCtx->pOutput; for (int32_t i = 0; i < pRes->numSampled; ++i) { - - tVariantDump(pRes->values + i, (char*)pOutput, type, true); + assignVal(pOutput, pRes->values + i*pCtx->inputBytes, pRes->colBytes, type); *pTimestamp = *(pRes->timeStamps + i); pOutput += pCtx->outputBytes; @@ -4719,11 +4724,12 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes srand(taosSafeRand()); - SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); pRes->totalPoints = 0; pRes->numSampled = 0; - pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo)); - pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64); + pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); + pRes->colBytes = (pCtx->currentStage != MERGE_STAGE) ? pCtx->inputBytes : pCtx->outputBytes; + pRes->timeStamps = (int64_t *)((char *)pRes->values + pRes->colBytes * pCtx->param[0].i64); return true; } @@ -4731,11 +4737,11 @@ static void sample_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); - if (pRes->values != (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo))) { - pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo)); - pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64); + if (pRes->values != ((char*)pRes + sizeof(SSampleFuncInfo))) { + pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); + pRes->timeStamps = (int64_t*)((char*)pRes->values + pRes->colBytes * pCtx->param[0].i64); } for (int32_t i = 0; i < pCtx->size; ++i) { @@ -4747,7 +4753,7 @@ static void sample_function(SQLFunctionCtx *pCtx) { notNullElems++; TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; - do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pCtx->inputBytes); + do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pRes->colBytes); } if (!pCtx->hasNull) { @@ -4762,6 +4768,23 @@ static void sample_function(SQLFunctionCtx *pCtx) { } } +static void sample_func_merge(SQLFunctionCtx *pCtx) { + SSampleFuncInfo* pInput = (SSampleFuncInfo*)GET_INPUT_DATA_LIST(pCtx); + pInput->values = ((char*)pInput + sizeof(SSampleFuncInfo)); + pInput->timeStamps = (int64_t*)((char*)pInput->values + pInput->colBytes * pCtx->param[0].i64); + + SSampleFuncInfo *pOutput = getSampleFuncOutputInfo(pCtx); + for (int32_t i = 0; i < pInput->numSampled; ++i) { + assignResultSample(pOutput, i, pInput->timeStamps[i], + pInput->values + i * pInput->colBytes, pCtx->outputType, pInput->colBytes); + } + + SET_VAL(pCtx, pInput->numSampled, pOutput->numSampled); + if (pOutput->numSampled > 0) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + pResInfo->hasResult = DATA_SET_FLAG; + } +} static void sample_func_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -4775,10 +4798,6 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { GET_TRUE_DATA_TYPE(); copySampleFuncRes(pCtx, type); - for (int32_t i = 0; i < pRes->numSampled; ++i) { - tVariantDestroy(pRes->values + i); - } - doFinalizer(pCtx); } @@ -5266,12 +5285,12 @@ SAggFunctionInfo aAggs[] = {{ // 38 "sample", TSDB_FUNC_SAMPLE, - TSDB_FUNC_INVALID_ID, + TSDB_FUNC_SAMPLE, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, sample_function_setup, sample_function, sample_func_finalizer, - noop1, + sample_func_merge, dataBlockRequired, }, {