From fa0f056f192ea72d74d734d63b372319a64573f1 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 12 Nov 2019 17:21:36 +0800 Subject: [PATCH] refactor some codes. fix bugs in selectivity+tags/ts query. --- src/client/src/tscFunctionImpl.c | 702 +++++++++++++++++-------- src/client/src/tscSQLParser.c | 77 ++- src/client/src/tscUtil.c | 69 +-- src/system/detail/src/vnodeQueryImpl.c | 13 +- src/system/detail/src/vnodeUtil.c | 2 +- 5 files changed, 587 insertions(+), 276 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 35011a259b..a29f8f1589 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -63,6 +63,14 @@ } \ } while (0); +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ +do {\ +for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ + } \ +} while(0); + void noop(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} typedef struct tValuePair { @@ -104,7 +112,9 @@ typedef struct SFirstLastInfo { } SFirstLastInfo; typedef struct SFirstLastInfo SLastrowInfo; -typedef struct SPercentileInfo { tMemBucket *pMemBucket; } SPercentileInfo; +typedef struct SPercentileInfo { + tMemBucket *pMemBucket; +} SPercentileInfo; typedef struct STopBotInfo { int32_t num; @@ -118,9 +128,13 @@ typedef struct SLeastsquareInfo { int64_t num; } SLeastsquareInfo; -typedef struct SAPercentileInfo { SHistogramInfo *pHisto; } SAPercentileInfo; +typedef struct SAPercentileInfo { + SHistogramInfo *pHisto; +} SAPercentileInfo; -typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo; +typedef struct STSCompInfo { + STSBuf *pTSBuf; +} STSCompInfo; int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { @@ -451,21 +465,32 @@ int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId } \ }; -#define UPDATE_DATA(ctx, left, right, num, sign) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = right; \ - DO_UPDATE_TAG_COLUMNS(ctx, 0); \ - (num) += 1; \ - } \ - } while (0) +#define UPDATE_DATA(ctx, left, right, num, sign, k) \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_TAG_COLUMNS(ctx, k); \ + (num) += 1; \ + } \ + } while (0); + +#define DUPATE_DATA_WITHOUT_TS(ctx, left, right, num, sign) \ +do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx); \ + (num) += 1; \ + } \ + } while (0); + #define LOOPCHECK_N(val, list, ctx, tsdbType, sign, num) \ for (int32_t i = 0; i < ((ctx)->size); ++i) { \ if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \ continue; \ } \ - UPDATE_DATA(ctx, val, (list)[i], num, sign); \ + TSKEY key = (ctx)->ptsList[i]; \ + UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \ } #define TYPED_LOOPCHECK_N(type, data, list, ctx, tsdbType, sign, notNullElems) \ @@ -886,16 +911,18 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, index = pCtx->preAggVals.maxIndex; } + TSKEY key = pCtx->ptsList[index]; + if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { int64_t val = GET_INT64_VAL(tval); if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { int8_t *data = (int8_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { int16_t *data = (int16_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { int32_t *data = (int32_t *)pOutput; #if defined(_DEBUG_VIEW) @@ -906,27 +933,27 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, *data = val; for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) { SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; - if (__ctx->functionId == TSDB_FUNC_TAG_DUMMY) { - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); - } else if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { - *((int64_t *)__ctx->aOutputBuf) = pCtx->ptsList[index]; + if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { + __ctx->tag = (tVariant){.i64Key = key, .nType = TSDB_DATA_TYPE_BIGINT}; } + + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); } } } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { int64_t *data = (int64_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { double *data = (double *)pOutput; double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { float *data = (float *)pOutput; double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } return; @@ -951,7 +978,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, if ((*retVal < pData[i]) ^ isMin) { *retVal = pData[i]; - DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[i]); + TSKEY k = pCtx->ptsList[i]; + + DO_UPDATE_TAG_COLUMNS(pCtx, k); } *notNullElems += 1; @@ -1089,12 +1118,12 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp switch (type) { case TSDB_DATA_TYPE_TINYINT: { int8_t v = GET_INT8_VAL(input); - UPDATE_DATA(pCtx, *(int8_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int8_t *)output, v, notNullElems, isMin); break; }; case TSDB_DATA_TYPE_SMALLINT: { int16_t v = GET_INT16_VAL(input); - UPDATE_DATA(pCtx, *(int16_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int16_t *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_INT: { @@ -1104,7 +1133,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); } notNullElems++; @@ -1113,17 +1142,17 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp } case TSDB_DATA_TYPE_FLOAT: { float v = GET_FLOAT_VAL(input); - UPDATE_DATA(pCtx, *(float *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(float *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_DOUBLE: { double v = GET_DOUBLE_VAL(input); - UPDATE_DATA(pCtx, *(double *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(double *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_BIGINT: { int64_t v = GET_INT64_VAL(input); - UPDATE_DATA(pCtx, *(int64_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int64_t *)output, v, notNullElems, isMin); break; }; default: @@ -1179,38 +1208,39 @@ static void max_func_second_merge(SQLFunctionCtx *pCtx) { static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin) { char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + TSKEY key = pCtx->ptsList[index]; int32_t num = 0; if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { int8_t *output = (int8_t *)pCtx->aOutputBuf; int8_t i = GET_INT8_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { int16_t *output = pCtx->aOutputBuf; int16_t i = GET_INT16_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { int32_t *output = pCtx->aOutputBuf; int32_t i = GET_INT32_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { int64_t *output = pCtx->aOutputBuf; int64_t i = GET_INT64_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { float *output = pCtx->aOutputBuf; float i = GET_FLOAT_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { double *output = pCtx->aOutputBuf; double i = GET_DOUBLE_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; @@ -1804,6 +1834,11 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); } else { // the tags are dumped from the ctx tag fields for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { + SQLFunctionCtx* __ctx = pTagInfo->pTagCtxList[i]; + if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { + __ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey}; + } + tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType); size += pTagInfo->pTagCtxList[i]->outputBytes; } @@ -1825,8 +1860,9 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, tValuePair **pList = pInfo->res; if (pInfo->num < maxLen) { - if (pInfo->num == 0 || ((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) && - val.i64Key >= pList[pInfo->num - 1]->v.i64Key) || + if (pInfo->num == 0 || + ((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) && + val.i64Key >= pList[pInfo->num - 1]->v.i64Key) || ((type >= TSDB_DATA_TYPE_FLOAT && type <= TSDB_DATA_TYPE_DOUBLE) && val.dKey >= pList[pInfo->num - 1]->v.dKey)) { valuePairAssign(pList[pInfo->num], type, &val.i64Key, ts, pTags, pTagInfo, stage); @@ -4293,173 +4329,427 @@ int32_t funcCompatDefList[28] = { */ 1, 1, 1, -1, 1, 1, 5}; -SQLAggFuncElem aAggs[28] = { - { - // 0, count function does not invoke the finalize function - "count", TSDB_FUNC_COUNT, TSDB_FUNC_COUNT, TSDB_BASE_FUNC_SO, function_setup, count_function, count_function_f, - no_next_step, noop, count_func_merge, count_func_merge, count_load_data_info, - }, - { - // 1 - "sum", TSDB_FUNC_SUM, TSDB_FUNC_SUM, TSDB_BASE_FUNC_SO, function_setup, sum_function, sum_function_f, - no_next_step, function_finalizer, sum_func_merge, sum_func_second_merge, precal_req_load_info, - }, - { - // 2 - "avg", TSDB_FUNC_AVG, TSDB_FUNC_AVG, TSDB_BASE_FUNC_SO, function_setup, avg_function, avg_function_f, - no_next_step, avg_finalizer, avg_func_merge, avg_func_second_merge, precal_req_load_info, - }, - { - // 3 - "min", TSDB_FUNC_MIN, TSDB_FUNC_MIN, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, min_func_setup, - min_function, min_function_f, no_next_step, function_finalizer, min_func_merge, min_func_second_merge, - precal_req_load_info, - }, - { - // 4 - "max", TSDB_FUNC_MAX, TSDB_FUNC_MAX, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, max_func_setup, - max_function, max_function_f, no_next_step, function_finalizer, max_func_merge, max_func_second_merge, - precal_req_load_info, - }, - { - // 5 - "stddev", TSDB_FUNC_STDDEV, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, - function_setup, stddev_function, stddev_function_f, stddev_next_step, stddev_finalizer, noop, noop, - data_req_load_info, - }, - { - // 6 - "percentile", TSDB_FUNC_PERCT, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, percentile_function_setup, percentile_function, - percentile_function_f, no_next_step, percentile_finalizer, noop, noop, data_req_load_info, - }, - { - // 7 - "apercentile", TSDB_FUNC_APERCT, TSDB_FUNC_APERCT, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC, - apercentile_function_setup, apercentile_function, apercentile_function_f, no_next_step, apercentile_finalizer, - apercentile_func_merge, apercentile_func_second_merge, data_req_load_info, - }, - { - // 8 - "first", TSDB_FUNC_FIRST, TSDB_FUNC_FIRST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - first_function, first_function_f, no_next_step, function_finalizer, noop, noop, first_data_req_info, - }, - { - // 9 - "last", TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - last_function, last_function_f, no_next_step, function_finalizer, noop, noop, last_data_req_info, - }, - { - // 10 - "last_row", TSDB_FUNC_LAST_ROW, TSDB_FUNC_LAST_ROW, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS | - TSDB_FUNCSTATE_SELECTIVITY, - first_last_function_setup, last_row_function, noop, no_next_step, last_row_finalizer, noop, - last_dist_func_second_merge, data_req_load_info, - }, - { - // 11 - "top", TSDB_FUNC_TOP, TSDB_FUNC_TOP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | - TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, top_function, top_function_f, no_next_step, top_bottom_func_finalizer, - top_func_merge, top_func_second_merge, data_req_load_info, - }, - { - // 12 - "bottom", TSDB_FUNC_BOTTOM, TSDB_FUNC_BOTTOM, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | - TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, bottom_function, bottom_function_f, no_next_step, top_bottom_func_finalizer, - bottom_func_merge, bottom_func_second_merge, data_req_load_info, - }, - { - // 13 - "spread", TSDB_FUNC_SPREAD, TSDB_FUNC_SPREAD, TSDB_BASE_FUNC_SO, spread_function_setup, spread_function, - spread_function_f, no_next_step, spread_function_finalizer, spread_func_merge, spread_func_sec_merge, - count_load_data_info, - }, - { - // 14 - "twa", TSDB_FUNC_TWA, TSDB_FUNC_TWA, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, twa_function_setup, - twa_function, twa_function_f, no_next_step, twa_function_finalizer, twa_func_merge, twa_function_copy, - data_req_load_info, - }, - { - // 15 - "leastsquares", TSDB_FUNC_LEASTSQR, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, leastsquares_function_setup, - leastsquares_function, leastsquares_function_f, no_next_step, leastsquares_finalizer, noop, noop, - data_req_load_info, - }, - { - // 16 - "ts", TSDB_FUNC_TS, TSDB_FUNC_TS, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, - date_col_output_function, date_col_output_function, no_next_step, noop, copy_function, copy_function, - no_data_info, - }, - { - // 17 - "ts", TSDB_FUNC_TS_DUMMY, TSDB_FUNC_TS_DUMMY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, noop, - noop, no_next_step, noop, copy_function, copy_function, data_req_load_info, - }, - { - // 18 - "tag", TSDB_FUNC_TAG_DUMMY, TSDB_FUNC_TAG_DUMMY, TSDB_BASE_FUNC_SO, function_setup, tag_function, noop, - no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 19 - "ts", TSDB_FUNC_TS_COMP, TSDB_FUNC_TS_COMP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, ts_comp_function_setup, - ts_comp_function, ts_comp_function_f, no_next_step, ts_comp_finalize, copy_function, copy_function, - data_req_load_info, - }, - { - // 20 - "tag", TSDB_FUNC_TAG, TSDB_FUNC_TAG, TSDB_BASE_FUNC_SO, function_setup, tag_function, tag_function_f, - no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 21, column project sql function - "colprj", TSDB_FUNC_PRJ, TSDB_FUNC_PRJ, TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, function_setup, - col_project_function, col_project_function_f, no_next_step, noop, copy_function, copy_function, - data_req_load_info, - }, - { - // 22, multi-output, tag function has only one result - "tagprj", TSDB_FUNC_TAGPRJ, TSDB_FUNC_TAGPRJ, TSDB_BASE_FUNC_MO, function_setup, tag_project_function, - tag_project_function_f, no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 23 - "arithmetic", TSDB_FUNC_ARITHM, TSDB_FUNC_ARITHM, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup, arithmetic_function, - arithmetic_function_f, no_next_step, noop, copy_function, copy_function, data_req_load_info, - }, - { - // 24 - "diff", TSDB_FUNC_DIFF, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, diff_function_setup, - diff_function, diff_function_f, no_next_step, noop, noop, noop, data_req_load_info, - }, - // distributed version used in two-stage aggregation processes - { - // 25 - "first_dist", TSDB_FUNC_FIRST_DST, TSDB_FUNC_FIRST_DST, - TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, - first_dist_function, first_dist_function_f, no_next_step, function_finalizer, first_dist_func_merge, - first_dist_func_second_merge, first_dist_data_req_info, - }, - { - // 26 - "last_dist", TSDB_FUNC_LAST_DST, TSDB_FUNC_LAST_DST, - TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, - last_dist_function, last_dist_function_f, no_next_step, function_finalizer, last_dist_func_merge, - last_dist_func_second_merge, last_dist_data_req_info, - }, - { - // 27 - "interp", TSDB_FUNC_INTERP, TSDB_FUNC_INTERP, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup, - interp_function, - do_sum_f, // todo filter handle - no_next_step, noop, noop, copy_function, no_data_info, - }}; +SQLAggFuncElem aAggs[28] = {{ + // 0, count function does not invoke the finalize function + "count", + TSDB_FUNC_COUNT, + TSDB_FUNC_COUNT, + TSDB_BASE_FUNC_SO, + function_setup, + count_function, + count_function_f, + no_next_step, + noop, + count_func_merge, + count_func_merge, + count_load_data_info, + }, + { + // 1 + "sum", + TSDB_FUNC_SUM, + TSDB_FUNC_SUM, + TSDB_BASE_FUNC_SO, + function_setup, + sum_function, + sum_function_f, + no_next_step, + function_finalizer, + sum_func_merge, + sum_func_second_merge, + precal_req_load_info, + }, + { + // 2 + "avg", + TSDB_FUNC_AVG, + TSDB_FUNC_AVG, + TSDB_BASE_FUNC_SO, + function_setup, + avg_function, + avg_function_f, + no_next_step, + avg_finalizer, + avg_func_merge, + avg_func_second_merge, + precal_req_load_info, + }, + { + // 3 + "min", + TSDB_FUNC_MIN, + TSDB_FUNC_MIN, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + min_func_setup, + min_function, + min_function_f, + no_next_step, + function_finalizer, + min_func_merge, + min_func_second_merge, + precal_req_load_info, + }, + { + // 4 + "max", + TSDB_FUNC_MAX, + TSDB_FUNC_MAX, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + max_func_setup, + max_function, + max_function_f, + no_next_step, + function_finalizer, + max_func_merge, + max_func_second_merge, + precal_req_load_info, + }, + { + // 5 + "stddev", + TSDB_FUNC_STDDEV, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + function_setup, + stddev_function, + stddev_function_f, + stddev_next_step, + stddev_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 6 + "percentile", + TSDB_FUNC_PERCT, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + percentile_function_setup, + percentile_function, + percentile_function_f, + no_next_step, + percentile_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 7 + "apercentile", + TSDB_FUNC_APERCT, + TSDB_FUNC_APERCT, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC, + apercentile_function_setup, + apercentile_function, + apercentile_function_f, + no_next_step, + apercentile_finalizer, + apercentile_func_merge, + apercentile_func_second_merge, + data_req_load_info, + }, + { + // 8 + "first", + TSDB_FUNC_FIRST, + TSDB_FUNC_FIRST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + first_function, + first_function_f, + no_next_step, + function_finalizer, + noop, + noop, + first_data_req_info, + }, + { + // 9 + "last", + TSDB_FUNC_LAST, + TSDB_FUNC_LAST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + last_function, + last_function_f, + no_next_step, + function_finalizer, + noop, + noop, + last_data_req_info, + }, + { + // 10 + "last_row", + TSDB_FUNC_LAST_ROW, + TSDB_FUNC_LAST_ROW, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + last_row_function, + noop, + no_next_step, + last_row_finalizer, + noop, + last_dist_func_second_merge, + data_req_load_info, + }, + { + // 11 + "top", + TSDB_FUNC_TOP, + TSDB_FUNC_TOP, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + top_bottom_function_setup, + top_function, + top_function_f, + no_next_step, + top_bottom_func_finalizer, + top_func_merge, + top_func_second_merge, + data_req_load_info, + }, + { + // 12 + "bottom", + TSDB_FUNC_BOTTOM, + TSDB_FUNC_BOTTOM, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + top_bottom_function_setup, + bottom_function, + bottom_function_f, + no_next_step, + top_bottom_func_finalizer, + bottom_func_merge, + bottom_func_second_merge, + data_req_load_info, + }, + { + // 13 + "spread", + TSDB_FUNC_SPREAD, + TSDB_FUNC_SPREAD, + TSDB_BASE_FUNC_SO, + spread_function_setup, + spread_function, + spread_function_f, + no_next_step, + spread_function_finalizer, + spread_func_merge, + spread_func_sec_merge, + count_load_data_info, + }, + { + // 14 + "twa", + TSDB_FUNC_TWA, + TSDB_FUNC_TWA, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + twa_function_setup, + twa_function, + twa_function_f, + no_next_step, + twa_function_finalizer, + twa_func_merge, + twa_function_copy, + data_req_load_info, + }, + { + // 15 + "leastsquares", + TSDB_FUNC_LEASTSQR, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + leastsquares_function_setup, + leastsquares_function, + leastsquares_function_f, + no_next_step, + leastsquares_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 16 + "ts", + TSDB_FUNC_TS, + TSDB_FUNC_TS, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + date_col_output_function, + date_col_output_function, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 17 + "ts", + TSDB_FUNC_TS_DUMMY, + TSDB_FUNC_TS_DUMMY, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + noop, + noop, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 18 + "tag", + TSDB_FUNC_TAG_DUMMY, + TSDB_FUNC_TAG_DUMMY, + TSDB_BASE_FUNC_SO, + function_setup, + tag_function, + noop, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 19 + "ts", + TSDB_FUNC_TS_COMP, + TSDB_FUNC_TS_COMP, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + ts_comp_function_setup, + ts_comp_function, + ts_comp_function_f, + no_next_step, + ts_comp_finalize, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 20 + "tag", + TSDB_FUNC_TAG, + TSDB_FUNC_TAG, + TSDB_BASE_FUNC_SO, + function_setup, + tag_function, + tag_function_f, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 21, column project sql function + "colprj", + TSDB_FUNC_PRJ, + TSDB_FUNC_PRJ, + TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + col_project_function, + col_project_function_f, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 22, multi-output, tag function has only one result + "tagprj", + TSDB_FUNC_TAGPRJ, + TSDB_FUNC_TAGPRJ, + TSDB_BASE_FUNC_MO, + function_setup, + tag_project_function, + tag_project_function_f, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 23 + "arithmetic", + TSDB_FUNC_ARITHM, + TSDB_FUNC_ARITHM, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + function_setup, + arithmetic_function, + arithmetic_function_f, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 24 + "diff", + TSDB_FUNC_DIFF, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + diff_function_setup, + diff_function, + diff_function_f, + no_next_step, + noop, + noop, + noop, + data_req_load_info, + }, + // distributed version used in two-stage aggregation processes + { + // 25 + "first_dist", + TSDB_FUNC_FIRST_DST, + TSDB_FUNC_FIRST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + first_dist_function, + first_dist_function_f, + no_next_step, + function_finalizer, + first_dist_func_merge, + first_dist_func_second_merge, + first_dist_data_req_info, + }, + { + // 26 + "last_dist", + TSDB_FUNC_LAST_DST, + TSDB_FUNC_LAST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + last_dist_function, + last_dist_function_f, + no_next_step, + function_finalizer, + last_dist_func_merge, + last_dist_func_second_merge, + last_dist_data_req_info, + }, + { + // 27 + "interp", + TSDB_FUNC_INTERP, + TSDB_FUNC_INTERP, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + function_setup, + interp_function, + do_sum_f, // todo filter handle + no_next_step, + noop, + noop, + copy_function, + no_data_info, + }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ae9704effa..1bc0163e5f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -81,7 +81,7 @@ static bool validateIpAddress(char* ip); static bool hasUnsupportFunctionsForMetricQuery(SSqlCmd* pCmd); static bool functionCompatibleCheck(SSqlCmd* pCmd); -static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); +static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); static int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList); static int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql); @@ -94,7 +94,7 @@ static int32_t parseFillClause(SSqlCmd* pCmd, SQuerySQL* pQuerySQL); static int32_t parseOrderbyClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd); -static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); +static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd); static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString); @@ -105,8 +105,8 @@ static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateColumnName(char* name); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); -static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); -static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex); +static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); +static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex); static int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql); static int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd); @@ -115,13 +115,12 @@ static int32_t getTableIndexByName(SSQLToken* pToken, SSqlCmd* pCmd, SColumnInde static int32_t optrToString(tSQLExpr* pExpr, char** exprString); static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); -static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex); -static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql); +static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex); +static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql); static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) { assert(QUERY_IS_STABLE_QUERY(pCmd->type)); - // here colIdx == -1 means the special column tbname that is the name of each table *queryOnMetricTags = true; for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); @@ -1151,7 +1150,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) { } // check the invalid sql expresssion: select count(tbname)/count(tag1)/count(tag2) from super_table interval(1d); - for(int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { setErrMsg(pCmd, msg1); @@ -2791,10 +2790,14 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { // diff function cannot be executed with other function // arithmetic function can be executed with other arithmetic functions for (int32_t i = startIdx + 1; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { - int16_t functionId = tscSqlExprGet(pCmd, i)->functionId; - if (functionId == TSDB_FUNC_TAGPRJ || - functionId == TSDB_FUNC_TAG || - functionId == TSDB_FUNC_TS) { + SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + + int16_t functionId = pExpr->functionId; + if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) { + continue; + } + + if (functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { continue; } @@ -2809,9 +2812,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { int16_t functionId = tscSqlExprGet(pCmd, i)->functionId; - if (functionId == TSDB_FUNC_PRJ || - functionId == TSDB_FUNC_TAGPRJ || - functionId == TSDB_FUNC_TS || + if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_ARITHM) { continue; } @@ -4986,19 +4987,37 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd) { int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) { bool isProjectionFunction = false; - const char* msg = "column projection is not compatible with interval"; - + const char* msg1 = "column projection is not compatible with interval"; + const char* msg2 = "interval not allowed for tag queries"; + // multi-output set/ todo refactor for (int32_t k = 0; k < pCmd->fieldsInfo.numOfOutputCols; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, k); + + // projection query on primary timestamp, the selectivity function needs to be present. + if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + bool hasSelectivity = false; + for(int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) { + SSqlExpr* pEx = tscSqlExprGet(pCmd, j); + if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) { + hasSelectivity = true; + break; + } + } + + if (hasSelectivity) { + continue; + } + } + if (pExpr->functionId == TSDB_FUNC_PRJ || pExpr->functionId == TSDB_FUNC_DIFF || - pExpr->functionId == TSDB_FUNC_ARITHM) { + pExpr->functionId == TSDB_FUNC_ARITHM) { isProjectionFunction = true; } } if (isProjectionFunction) { - setErrMsg(pCmd, msg); + setErrMsg(pCmd, msg1); } return isProjectionFunction == true ? TSDB_CODE_INVALID_SQL : TSDB_CODE_SUCCESS; @@ -5164,8 +5183,7 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) { if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { bool queryOnTags = false; - int32_t ret = tscQueryOnlyMetricTags(pCmd, &queryOnTags); - if (ret != TSDB_CODE_SUCCESS) { + if (tscQueryOnlyMetricTags(pCmd, &queryOnTags) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -5382,8 +5400,8 @@ static void doUpdateSqlFunctionForTagPrj(SSqlCmd* pCmd) { } } - int16_t resType = 0; - int16_t resBytes = 0; + int16_t resType = 0; + int16_t resBytes = 0; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); @@ -5464,7 +5482,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) { const char* msg2 = "functions not allowed"; bool tagColExists = false; - int16_t numOfTimestamp = 0; // primary timestamp column + int16_t numOfTimestamp = 0; // primary timestamp column int16_t numOfSelectivity = 0; int16_t numOfAggregation = 0; @@ -5493,7 +5511,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) { // When the tag projection function on tag column that is not in the group by clause, aggregation function and // selectivity function exist in select clause is not allowed. - if(numOfAggregation > 0) { + if (numOfAggregation > 0) { setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } @@ -5598,14 +5616,14 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { const char* msg2 = "interval not allowed in group by normal column"; const char* msg3 = "group by not allowed on projection query"; const char* msg4 = "tags retrieve not compatible with group by"; - const char* msg5 = "retrieve tags not compatible with group by "; + const char* msg5 = "retrieve tags not compatible with group by or interval query"; SSqlCmd* pCmd = &pSql->cmd; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); // only retrieve tags, group by is not supportted if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) { - if (pCmd->groupbyExpr.numOfGroupCols > 0) { + if (pCmd->groupbyExpr.numOfGroupCols > 0 || pCmd->nAggTimeInterval > 0) { setErrMsg(pCmd, msg5); return TSDB_CODE_INVALID_SQL; } else { @@ -5634,7 +5652,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { * group by normal columns. * Check if the column projection is identical to the group by column or not */ - if (functId == TSDB_FUNC_PRJ) { + if (functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { bool qualified = false; for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) { SColIndexEx* pColIndex = &pCmd->groupbyExpr.columnInfo[j]; @@ -5650,7 +5668,8 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { } if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM && - functId != TSDB_FUNC_TAGPRJ) { + functId != TSDB_FUNC_TAGPRJ && + (functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX)) { setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 547171cffe..fd23b183b1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -142,7 +142,6 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) { return false; } - void tscGetDBInfoFromMeterId(char* meterId, char* db) { char* st = strstr(meterId, TS_PATH_DELIMITER); if (st != NULL) { @@ -265,7 +264,7 @@ bool tscIsPointInterpQuery(SSqlCmd* pCmd) { } bool tscIsTWAQuery(SSqlCmd* pCmd) { - for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { + for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); if (pExpr == NULL) { continue; @@ -450,7 +449,8 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock); } -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset) { +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, + uint32_t offset) { uint32_t needed = pDataBlock->numOfParams + 1; if (needed > pDataBlock->numOfAllocedParams) { needed *= 2; @@ -490,13 +490,13 @@ SDataBlockList* tscCreateBlockArrayList() { return pDataBlockArrayList; } -void tscAppendDataBlock(SDataBlockList *pList, STableDataBlocks *pBlocks) { +void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { if (pList->nSize >= pList->nAlloc) { pList->nAlloc = pList->nAlloc << 1; - pList->pData = realloc(pList->pData, sizeof(void *) * (size_t)pList->nAlloc); + pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); // reset allocated memory - memset(pList->pData + pList->nSize, 0, sizeof(void *) * (pList->nAlloc - pList->nSize)); + memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); } pList->pData[pList->nSize++] = pBlocks; @@ -553,7 +553,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { } STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { - STableDataBlocks *dataBuf = tscCreateDataBlock(size); + STableDataBlocks* dataBuf = tscCreateDataBlock(size); dataBuf->rowSize = rowSize; dataBuf->size = startOffset; @@ -573,7 +573,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData } if (dataBuf == NULL) { - dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId); + dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf); } @@ -604,7 +604,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi if (tmp != NULL) { dataBuf->pData = tmp; memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); - } else { // failed to allocate memory, free already allocated memory and return error code + } else { // failed to allocate memory, free already allocated memory and return error code tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize); taosCleanUpIntHash(pVnodeDataBlockHashList); @@ -673,7 +673,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->allocSize = size; } else { if (pCmd->allocSize < size) { - char* b = realloc(pCmd->payload, size); + char* b = realloc(pCmd->payload, size); if (b == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; pCmd->payload = b; pCmd->allocSize = size; @@ -869,11 +869,11 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { static void _exprCheckSpace(SSqlExprInfo* pExprInfo, int32_t size) { if (size > pExprInfo->numOfAlloc) { - int32_t oldSize = pExprInfo->numOfAlloc; + uint32_t oldSize = pExprInfo->numOfAlloc; - int32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1); + uint32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1U); while (newSize < size) { - newSize = (newSize << 1); + newSize = (newSize << 1U); } if (newSize > TSDB_MAX_COLUMNS) { @@ -1161,7 +1161,7 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { assert(pColumnBaseInfo->numOfCols <= TSDB_MAX_COLUMNS); for (int32_t i = 0; i < pColumnBaseInfo->numOfCols; ++i) { - SColumnBase *pColBase = &(pColumnBaseInfo->pColList[i]); + SColumnBase* pColBase = &(pColumnBaseInfo->pColList[i]); if (pColBase->numOfFilters > 0) { for (int32_t j = 0; j < pColBase->numOfFilters; ++j) { @@ -1179,8 +1179,9 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { tfree(pColumnBaseInfo->pColList); } - -void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { _cf_ensureSpace(pColumnBaseInfo, size); } +void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { + _cf_ensureSpace(pColumnBaseInfo, size); +} /* * 1. normal name, not a keyword or number @@ -1228,16 +1229,16 @@ int32_t tscValidateName(SSQLToken* pToken) { int len = tSQLGetToken(pToken->z, &pToken->type); // single token, validate it - if (len == pToken->n){ + if (len == pToken->n) { return validateQuoteToken(pToken); } else { - sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); - if (sep == NULL) { - return TSDB_CODE_INVALID_SQL; - } + sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); + if (sep == NULL) { + return TSDB_CODE_INVALID_SQL; + } return tscValidateName(pToken); - } + } } else { if (isNumber(pToken)) { return TSDB_CODE_INVALID_SQL; @@ -1616,8 +1617,8 @@ int32_t SStringAlloc(SString* pStr, int32_t size) { #ifdef WINDOWS LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR)&lpMsgBuf, 0, NULL); tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); LocalFree(lpMsgBuf); #else @@ -1652,12 +1653,11 @@ int32_t SStringEnsureRemain(SString* pStr, int32_t size) { char* tmp = realloc(pStr->z, newsize); if (tmp == NULL) { - #ifdef WINDOWS LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR)&lpMsgBuf, 0, NULL); tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); LocalFree(lpMsgBuf); #else @@ -1728,7 +1728,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex if (pPrevSql != NULL) { pNew->cmd.type = pPrevSql->cmd.type; } else { - pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery + pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery } uint64_t uid = pMeterMetaInfo->pMeterMeta->uid; @@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex char key[TSDB_MAX_TAGS_LEN + 1] = {0}; tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); - char* name = pMeterMetaInfo->name; + char* name = pMeterMetaInfo->name; SMeterMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { @@ -1768,11 +1768,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex SMetricMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key); pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags, - pMeterMetaInfo->tagColumnIndex); + pMeterMetaInfo->tagColumnIndex); } else { SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0); - pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, pMeterMetaInfo->numOfTags, - pMeterMetaInfo->tagColumnIndex); + pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, + pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); pPrevInfo->pMeterMeta = NULL; pPrevInfo->pMetricMeta = NULL; @@ -1783,13 +1783,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, pNew->cmd.type); + tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, + pNew->cmd.type); return pNew; } void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; - void* fp = pSql->fp; + void* fp = pSql->fp; if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9c0d7883a5..654f9537e4 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -3122,9 +3122,11 @@ static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || + functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) { continue; } + if (functionId != functId && functionId != functIdDst) { return false; } @@ -3137,10 +3139,9 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } -static void rewriteExecOrder(SQuery *pQuery, bool metricQuery) { +static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { // in case of point-interpolation query, use asc order scan - char msg[] = - "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " + char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " "new qrange:%lld-%lld"; // descending order query @@ -3614,7 +3615,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete } setScanLimitationByResultBuffer(pQuery); - rewriteExecOrder(pQuery, false); + changeExecuteScanOrder(pQuery, false); pQInfo->over = 0; pQInfo->pointsRead = 0; @@ -3790,7 +3791,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQInfo->pointsRead = 0; pQuery->pointsRead = 0; - rewriteExecOrder(pQuery, true); + changeExecuteScanOrder(pQuery, true); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index 790ad1c4a1..7a2fb5530c 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -289,7 +289,7 @@ SSqlFunctionExpr* vnodeCreateSqlFunctionExpr(SQueryMeterMsg* pQueryMsg, int32_t* return NULL; } - if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY) { + if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pExprs[i].resBytes; } assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes)); -- GitLab