diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index c23536f63170876bb4b3998fa38fafbc8c3cd23d..8e1fdf998a66cec5cbb59105686be6348d3a99a4 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -63,6 +63,14 @@ } \ } while (0); +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ +do {\ +for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ + } \ +} while(0); + void noop(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} typedef struct tValuePair { @@ -104,7 +112,9 @@ typedef struct SFirstLastInfo { } SFirstLastInfo; typedef struct SFirstLastInfo SLastrowInfo; -typedef struct SPercentileInfo { tMemBucket *pMemBucket; } SPercentileInfo; +typedef struct SPercentileInfo { + tMemBucket *pMemBucket; +} SPercentileInfo; typedef struct STopBotInfo { int32_t num; @@ -118,9 +128,13 @@ typedef struct SLeastsquareInfo { int64_t num; } SLeastsquareInfo; -typedef struct SAPercentileInfo { SHistogramInfo *pHisto; } SAPercentileInfo; +typedef struct SAPercentileInfo { + SHistogramInfo *pHisto; +} SAPercentileInfo; -typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo; +typedef struct STSCompInfo { + STSBuf *pTSBuf; +} STSCompInfo; int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { @@ -451,21 +465,32 @@ int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId } \ }; -#define UPDATE_DATA(ctx, left, right, num, sign) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = right; \ - DO_UPDATE_TAG_COLUMNS(ctx, 0); \ - (num) += 1; \ - } \ - } while (0) +#define UPDATE_DATA(ctx, left, right, num, sign, k) \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_TAG_COLUMNS(ctx, k); \ + (num) += 1; \ + } \ + } while (0); + +#define DUPATE_DATA_WITHOUT_TS(ctx, left, right, num, sign) \ +do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx); \ + (num) += 1; \ + } \ + } while (0); + #define LOOPCHECK_N(val, list, ctx, tsdbType, sign, num) \ for (int32_t i = 0; i < ((ctx)->size); ++i) { \ if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \ continue; \ } \ - UPDATE_DATA(ctx, val, (list)[i], num, sign); \ + TSKEY key = (ctx)->ptsList[i]; \ + UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \ } #define TYPED_LOOPCHECK_N(type, data, list, ctx, tsdbType, sign, notNullElems) \ @@ -886,16 +911,18 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, index = pCtx->preAggVals.maxIndex; } + TSKEY key = pCtx->ptsList[index]; + if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { int64_t val = GET_INT64_VAL(tval); if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { int8_t *data = (int8_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { int16_t *data = (int16_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { int32_t *data = (int32_t *)pOutput; #if defined(_DEBUG_VIEW) @@ -906,27 +933,27 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, *data = val; for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) { SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; - if (__ctx->functionId == TSDB_FUNC_TAG_DUMMY) { - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); - } else if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { - *((int64_t *)__ctx->aOutputBuf) = pCtx->ptsList[index]; + if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { + __ctx->tag = (tVariant){.i64Key = key, .nType = TSDB_DATA_TYPE_BIGINT}; } + + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); } } } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { int64_t *data = (int64_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { double *data = (double *)pOutput; double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { float *data = (float *)pOutput; double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin); + UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); } return; @@ -951,7 +978,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, if ((*retVal < pData[i]) ^ isMin) { *retVal = pData[i]; - DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[i]); + TSKEY k = pCtx->ptsList[i]; + + DO_UPDATE_TAG_COLUMNS(pCtx, k); } *notNullElems += 1; @@ -1089,12 +1118,12 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp switch (type) { case TSDB_DATA_TYPE_TINYINT: { int8_t v = GET_INT8_VAL(input); - UPDATE_DATA(pCtx, *(int8_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int8_t *)output, v, notNullElems, isMin); break; }; case TSDB_DATA_TYPE_SMALLINT: { int16_t v = GET_INT16_VAL(input); - UPDATE_DATA(pCtx, *(int16_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int16_t *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_INT: { @@ -1104,7 +1133,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); } notNullElems++; @@ -1113,17 +1142,17 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp } case TSDB_DATA_TYPE_FLOAT: { float v = GET_FLOAT_VAL(input); - UPDATE_DATA(pCtx, *(float *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(float *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_DOUBLE: { double v = GET_DOUBLE_VAL(input); - UPDATE_DATA(pCtx, *(double *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(double *)output, v, notNullElems, isMin); break; } case TSDB_DATA_TYPE_BIGINT: { int64_t v = GET_INT64_VAL(input); - UPDATE_DATA(pCtx, *(int64_t *)output, v, notNullElems, isMin); + DUPATE_DATA_WITHOUT_TS(pCtx, *(int64_t *)output, v, notNullElems, isMin); break; }; default: @@ -1179,38 +1208,39 @@ static void max_func_second_merge(SQLFunctionCtx *pCtx) { static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin) { char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + TSKEY key = pCtx->ptsList[index]; int32_t num = 0; if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { int8_t *output = (int8_t *)pCtx->aOutputBuf; int8_t i = GET_INT8_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { int16_t *output = pCtx->aOutputBuf; int16_t i = GET_INT16_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { int32_t *output = pCtx->aOutputBuf; int32_t i = GET_INT32_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { int64_t *output = pCtx->aOutputBuf; int64_t i = GET_INT64_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { float *output = pCtx->aOutputBuf; float i = GET_FLOAT_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { double *output = pCtx->aOutputBuf; double i = GET_DOUBLE_VAL(pData); - UPDATE_DATA(pCtx, *output, i, num, isMin); + UPDATE_DATA(pCtx, *output, i, num, isMin, key); } GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; @@ -1804,6 +1834,11 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); } else { // the tags are dumped from the ctx tag fields for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { + SQLFunctionCtx* __ctx = pTagInfo->pTagCtxList[i]; + if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { + __ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey}; + } + tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType); size += pTagInfo->pTagCtxList[i]->outputBytes; } @@ -1825,8 +1860,9 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, tValuePair **pList = pInfo->res; if (pInfo->num < maxLen) { - if (pInfo->num == 0 || ((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) && - val.i64Key >= pList[pInfo->num - 1]->v.i64Key) || + if (pInfo->num == 0 || + ((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) && + val.i64Key >= pList[pInfo->num - 1]->v.i64Key) || ((type >= TSDB_DATA_TYPE_FLOAT && type <= TSDB_DATA_TYPE_DOUBLE) && val.dKey >= pList[pInfo->num - 1]->v.dKey)) { valuePairAssign(pList[pInfo->num], type, &val.i64Key, ts, pTags, pTagInfo, stage); @@ -4324,173 +4360,427 @@ int32_t funcCompatDefList[28] = { */ 1, 1, 1, -1, 1, 1, 5}; -SQLAggFuncElem aAggs[28] = { - { - // 0, count function does not invoke the finalize function - "count", TSDB_FUNC_COUNT, TSDB_FUNC_COUNT, TSDB_BASE_FUNC_SO, function_setup, count_function, count_function_f, - no_next_step, noop, count_func_merge, count_func_merge, count_load_data_info, - }, - { - // 1 - "sum", TSDB_FUNC_SUM, TSDB_FUNC_SUM, TSDB_BASE_FUNC_SO, function_setup, sum_function, sum_function_f, - no_next_step, function_finalizer, sum_func_merge, sum_func_second_merge, precal_req_load_info, - }, - { - // 2 - "avg", TSDB_FUNC_AVG, TSDB_FUNC_AVG, TSDB_BASE_FUNC_SO, function_setup, avg_function, avg_function_f, - no_next_step, avg_finalizer, avg_func_merge, avg_func_second_merge, precal_req_load_info, - }, - { - // 3 - "min", TSDB_FUNC_MIN, TSDB_FUNC_MIN, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, min_func_setup, - min_function, min_function_f, no_next_step, function_finalizer, min_func_merge, min_func_second_merge, - precal_req_load_info, - }, - { - // 4 - "max", TSDB_FUNC_MAX, TSDB_FUNC_MAX, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, max_func_setup, - max_function, max_function_f, no_next_step, function_finalizer, max_func_merge, max_func_second_merge, - precal_req_load_info, - }, - { - // 5 - "stddev", TSDB_FUNC_STDDEV, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, - function_setup, stddev_function, stddev_function_f, stddev_next_step, stddev_finalizer, noop, noop, - data_req_load_info, - }, - { - // 6 - "percentile", TSDB_FUNC_PERCT, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, percentile_function_setup, percentile_function, - percentile_function_f, no_next_step, percentile_finalizer, noop, noop, data_req_load_info, - }, - { - // 7 - "apercentile", TSDB_FUNC_APERCT, TSDB_FUNC_APERCT, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC, - apercentile_function_setup, apercentile_function, apercentile_function_f, no_next_step, apercentile_finalizer, - apercentile_func_merge, apercentile_func_second_merge, data_req_load_info, - }, - { - // 8 - "first", TSDB_FUNC_FIRST, TSDB_FUNC_FIRST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - first_function, first_function_f, no_next_step, function_finalizer, noop, noop, first_data_req_info, - }, - { - // 9 - "last", TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - last_function, last_function_f, no_next_step, function_finalizer, noop, noop, last_data_req_info, - }, - { - // 10 - "last_row", TSDB_FUNC_LAST_ROW, TSDB_FUNC_LAST_ROW, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS | - TSDB_FUNCSTATE_SELECTIVITY, - first_last_function_setup, last_row_function, noop, no_next_step, last_row_finalizer, noop, - last_dist_func_second_merge, data_req_load_info, - }, - { - // 11 - "top", TSDB_FUNC_TOP, TSDB_FUNC_TOP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | - TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, top_function, top_function_f, no_next_step, top_bottom_func_finalizer, - top_func_merge, top_func_second_merge, data_req_load_info, - }, - { - // 12 - "bottom", TSDB_FUNC_BOTTOM, TSDB_FUNC_BOTTOM, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | - TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, bottom_function, bottom_function_f, no_next_step, top_bottom_func_finalizer, - bottom_func_merge, bottom_func_second_merge, data_req_load_info, - }, - { - // 13 - "spread", TSDB_FUNC_SPREAD, TSDB_FUNC_SPREAD, TSDB_BASE_FUNC_SO, spread_function_setup, spread_function, - spread_function_f, no_next_step, spread_function_finalizer, spread_func_merge, spread_func_sec_merge, - count_load_data_info, - }, - { - // 14 - "twa", TSDB_FUNC_TWA, TSDB_FUNC_TWA, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, twa_function_setup, - twa_function, twa_function_f, no_next_step, twa_function_finalizer, twa_func_merge, twa_function_copy, - data_req_load_info, - }, - { - // 15 - "leastsquares", TSDB_FUNC_LEASTSQR, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, leastsquares_function_setup, - leastsquares_function, leastsquares_function_f, no_next_step, leastsquares_finalizer, noop, noop, - data_req_load_info, - }, - { - // 16 - "ts", TSDB_FUNC_TS, TSDB_FUNC_TS, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, - date_col_output_function, date_col_output_function, no_next_step, noop, copy_function, copy_function, - no_data_info, - }, - { - // 17 - "ts", TSDB_FUNC_TS_DUMMY, TSDB_FUNC_TS_DUMMY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, noop, - noop, no_next_step, noop, copy_function, copy_function, data_req_load_info, - }, - { - // 18 - "tag", TSDB_FUNC_TAG_DUMMY, TSDB_FUNC_TAG_DUMMY, TSDB_BASE_FUNC_SO, function_setup, tag_function, noop, - no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 19 - "ts", TSDB_FUNC_TS_COMP, TSDB_FUNC_TS_COMP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, ts_comp_function_setup, - ts_comp_function, ts_comp_function_f, no_next_step, ts_comp_finalize, copy_function, copy_function, - data_req_load_info, - }, - { - // 20 - "tag", TSDB_FUNC_TAG, TSDB_FUNC_TAG, TSDB_BASE_FUNC_SO, function_setup, tag_function, tag_function_f, - no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 21, column project sql function - "colprj", TSDB_FUNC_PRJ, TSDB_FUNC_PRJ, TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, function_setup, - col_project_function, col_project_function_f, no_next_step, noop, copy_function, copy_function, - data_req_load_info, - }, - { - // 22, multi-output, tag function has only one result - "tagprj", TSDB_FUNC_TAGPRJ, TSDB_FUNC_TAGPRJ, TSDB_BASE_FUNC_MO, function_setup, tag_project_function, - tag_project_function_f, no_next_step, noop, copy_function, copy_function, no_data_info, - }, - { - // 23 - "arithmetic", TSDB_FUNC_ARITHM, TSDB_FUNC_ARITHM, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup, arithmetic_function, - arithmetic_function_f, no_next_step, noop, copy_function, copy_function, data_req_load_info, - }, - { - // 24 - "diff", TSDB_FUNC_DIFF, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, diff_function_setup, - diff_function, diff_function_f, no_next_step, noop, noop, noop, data_req_load_info, - }, - // distributed version used in two-stage aggregation processes - { - // 25 - "first_dist", TSDB_FUNC_FIRST_DST, TSDB_FUNC_FIRST_DST, - TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, - first_dist_function, first_dist_function_f, no_next_step, function_finalizer, first_dist_func_merge, - first_dist_func_second_merge, first_dist_data_req_info, - }, - { - // 26 - "last_dist", TSDB_FUNC_LAST_DST, TSDB_FUNC_LAST_DST, - TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, - last_dist_function, last_dist_function_f, no_next_step, function_finalizer, last_dist_func_merge, - last_dist_func_second_merge, last_dist_data_req_info, - }, - { - // 27 - "interp", TSDB_FUNC_INTERP, TSDB_FUNC_INTERP, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup, - interp_function, - do_sum_f, // todo filter handle - no_next_step, noop, noop, copy_function, no_data_info, - }}; +SQLAggFuncElem aAggs[28] = {{ + // 0, count function does not invoke the finalize function + "count", + TSDB_FUNC_COUNT, + TSDB_FUNC_COUNT, + TSDB_BASE_FUNC_SO, + function_setup, + count_function, + count_function_f, + no_next_step, + noop, + count_func_merge, + count_func_merge, + count_load_data_info, + }, + { + // 1 + "sum", + TSDB_FUNC_SUM, + TSDB_FUNC_SUM, + TSDB_BASE_FUNC_SO, + function_setup, + sum_function, + sum_function_f, + no_next_step, + function_finalizer, + sum_func_merge, + sum_func_second_merge, + precal_req_load_info, + }, + { + // 2 + "avg", + TSDB_FUNC_AVG, + TSDB_FUNC_AVG, + TSDB_BASE_FUNC_SO, + function_setup, + avg_function, + avg_function_f, + no_next_step, + avg_finalizer, + avg_func_merge, + avg_func_second_merge, + precal_req_load_info, + }, + { + // 3 + "min", + TSDB_FUNC_MIN, + TSDB_FUNC_MIN, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + min_func_setup, + min_function, + min_function_f, + no_next_step, + function_finalizer, + min_func_merge, + min_func_second_merge, + precal_req_load_info, + }, + { + // 4 + "max", + TSDB_FUNC_MAX, + TSDB_FUNC_MAX, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + max_func_setup, + max_function, + max_function_f, + no_next_step, + function_finalizer, + max_func_merge, + max_func_second_merge, + precal_req_load_info, + }, + { + // 5 + "stddev", + TSDB_FUNC_STDDEV, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + function_setup, + stddev_function, + stddev_function_f, + stddev_next_step, + stddev_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 6 + "percentile", + TSDB_FUNC_PERCT, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + percentile_function_setup, + percentile_function, + percentile_function_f, + no_next_step, + percentile_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 7 + "apercentile", + TSDB_FUNC_APERCT, + TSDB_FUNC_APERCT, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC, + apercentile_function_setup, + apercentile_function, + apercentile_function_f, + no_next_step, + apercentile_finalizer, + apercentile_func_merge, + apercentile_func_second_merge, + data_req_load_info, + }, + { + // 8 + "first", + TSDB_FUNC_FIRST, + TSDB_FUNC_FIRST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + first_function, + first_function_f, + no_next_step, + function_finalizer, + noop, + noop, + first_data_req_info, + }, + { + // 9 + "last", + TSDB_FUNC_LAST, + TSDB_FUNC_LAST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + last_function, + last_function_f, + no_next_step, + function_finalizer, + noop, + noop, + last_data_req_info, + }, + { + // 10 + "last_row", + TSDB_FUNC_LAST_ROW, + TSDB_FUNC_LAST_ROW, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + last_row_function, + noop, + no_next_step, + last_row_finalizer, + noop, + last_dist_func_second_merge, + data_req_load_info, + }, + { + // 11 + "top", + TSDB_FUNC_TOP, + TSDB_FUNC_TOP, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + top_bottom_function_setup, + top_function, + top_function_f, + no_next_step, + top_bottom_func_finalizer, + top_func_merge, + top_func_second_merge, + data_req_load_info, + }, + { + // 12 + "bottom", + TSDB_FUNC_BOTTOM, + TSDB_FUNC_BOTTOM, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SELECTIVITY, + top_bottom_function_setup, + bottom_function, + bottom_function_f, + no_next_step, + top_bottom_func_finalizer, + bottom_func_merge, + bottom_func_second_merge, + data_req_load_info, + }, + { + // 13 + "spread", + TSDB_FUNC_SPREAD, + TSDB_FUNC_SPREAD, + TSDB_BASE_FUNC_SO, + spread_function_setup, + spread_function, + spread_function_f, + no_next_step, + spread_function_finalizer, + spread_func_merge, + spread_func_sec_merge, + count_load_data_info, + }, + { + // 14 + "twa", + TSDB_FUNC_TWA, + TSDB_FUNC_TWA, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + twa_function_setup, + twa_function, + twa_function_f, + no_next_step, + twa_function_finalizer, + twa_func_merge, + twa_function_copy, + data_req_load_info, + }, + { + // 15 + "leastsquares", + TSDB_FUNC_LEASTSQR, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, + leastsquares_function_setup, + leastsquares_function, + leastsquares_function_f, + no_next_step, + leastsquares_finalizer, + noop, + noop, + data_req_load_info, + }, + { + // 16 + "ts", + TSDB_FUNC_TS, + TSDB_FUNC_TS, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + date_col_output_function, + date_col_output_function, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 17 + "ts", + TSDB_FUNC_TS_DUMMY, + TSDB_FUNC_TS_DUMMY, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + noop, + noop, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 18 + "tag", + TSDB_FUNC_TAG_DUMMY, + TSDB_FUNC_TAG_DUMMY, + TSDB_BASE_FUNC_SO, + function_setup, + tag_function, + noop, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 19 + "ts", + TSDB_FUNC_TS_COMP, + TSDB_FUNC_TS_COMP, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + ts_comp_function_setup, + ts_comp_function, + ts_comp_function_f, + no_next_step, + ts_comp_finalize, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 20 + "tag", + TSDB_FUNC_TAG, + TSDB_FUNC_TAG, + TSDB_BASE_FUNC_SO, + function_setup, + tag_function, + tag_function_f, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 21, column project sql function + "colprj", + TSDB_FUNC_PRJ, + TSDB_FUNC_PRJ, + TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, + function_setup, + col_project_function, + col_project_function_f, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 22, multi-output, tag function has only one result + "tagprj", + TSDB_FUNC_TAGPRJ, + TSDB_FUNC_TAGPRJ, + TSDB_BASE_FUNC_MO, + function_setup, + tag_project_function, + tag_project_function_f, + no_next_step, + noop, + copy_function, + copy_function, + no_data_info, + }, + { + // 23 + "arithmetic", + TSDB_FUNC_ARITHM, + TSDB_FUNC_ARITHM, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + function_setup, + arithmetic_function, + arithmetic_function_f, + no_next_step, + noop, + copy_function, + copy_function, + data_req_load_info, + }, + { + // 24 + "diff", + TSDB_FUNC_DIFF, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + diff_function_setup, + diff_function, + diff_function_f, + no_next_step, + noop, + noop, + noop, + data_req_load_info, + }, + // distributed version used in two-stage aggregation processes + { + // 25 + "first_dist", + TSDB_FUNC_FIRST_DST, + TSDB_FUNC_FIRST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + first_dist_function, + first_dist_function_f, + no_next_step, + function_finalizer, + first_dist_func_merge, + first_dist_func_second_merge, + first_dist_data_req_info, + }, + { + // 26 + "last_dist", + TSDB_FUNC_LAST_DST, + TSDB_FUNC_LAST_DST, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, + first_last_function_setup, + last_dist_function, + last_dist_function_f, + no_next_step, + function_finalizer, + last_dist_func_merge, + last_dist_func_second_merge, + last_dist_data_req_info, + }, + { + // 27 + "interp", + TSDB_FUNC_INTERP, + TSDB_FUNC_INTERP, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + function_setup, + interp_function, + do_sum_f, // todo filter handle + no_next_step, + noop, + noop, + copy_function, + no_data_info, + }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ae9704effa601a5594c59732a526ad5375f60520..1bc0163e5f3487153ab3baca02025d8d6ead3170 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -81,7 +81,7 @@ static bool validateIpAddress(char* ip); static bool hasUnsupportFunctionsForMetricQuery(SSqlCmd* pCmd); static bool functionCompatibleCheck(SSqlCmd* pCmd); -static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); +static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); static int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList); static int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql); @@ -94,7 +94,7 @@ static int32_t parseFillClause(SSqlCmd* pCmd, SQuerySQL* pQuerySQL); static int32_t parseOrderbyClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd); -static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); +static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd); static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString); @@ -105,8 +105,8 @@ static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateColumnName(char* name); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); -static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); -static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex); +static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); +static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex); static int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql); static int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd); @@ -115,13 +115,12 @@ static int32_t getTableIndexByName(SSQLToken* pToken, SSqlCmd* pCmd, SColumnInde static int32_t optrToString(tSQLExpr* pExpr, char** exprString); static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); -static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex); -static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql); +static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex); +static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql); static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) { assert(QUERY_IS_STABLE_QUERY(pCmd->type)); - // here colIdx == -1 means the special column tbname that is the name of each table *queryOnMetricTags = true; for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); @@ -1151,7 +1150,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) { } // check the invalid sql expresssion: select count(tbname)/count(tag1)/count(tag2) from super_table interval(1d); - for(int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { setErrMsg(pCmd, msg1); @@ -2791,10 +2790,14 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { // diff function cannot be executed with other function // arithmetic function can be executed with other arithmetic functions for (int32_t i = startIdx + 1; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { - int16_t functionId = tscSqlExprGet(pCmd, i)->functionId; - if (functionId == TSDB_FUNC_TAGPRJ || - functionId == TSDB_FUNC_TAG || - functionId == TSDB_FUNC_TS) { + SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + + int16_t functionId = pExpr->functionId; + if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) { + continue; + } + + if (functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { continue; } @@ -2809,9 +2812,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { int16_t functionId = tscSqlExprGet(pCmd, i)->functionId; - if (functionId == TSDB_FUNC_PRJ || - functionId == TSDB_FUNC_TAGPRJ || - functionId == TSDB_FUNC_TS || + if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_ARITHM) { continue; } @@ -4986,19 +4987,37 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd) { int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) { bool isProjectionFunction = false; - const char* msg = "column projection is not compatible with interval"; - + const char* msg1 = "column projection is not compatible with interval"; + const char* msg2 = "interval not allowed for tag queries"; + // multi-output set/ todo refactor for (int32_t k = 0; k < pCmd->fieldsInfo.numOfOutputCols; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, k); + + // projection query on primary timestamp, the selectivity function needs to be present. + if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + bool hasSelectivity = false; + for(int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) { + SSqlExpr* pEx = tscSqlExprGet(pCmd, j); + if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) { + hasSelectivity = true; + break; + } + } + + if (hasSelectivity) { + continue; + } + } + if (pExpr->functionId == TSDB_FUNC_PRJ || pExpr->functionId == TSDB_FUNC_DIFF || - pExpr->functionId == TSDB_FUNC_ARITHM) { + pExpr->functionId == TSDB_FUNC_ARITHM) { isProjectionFunction = true; } } if (isProjectionFunction) { - setErrMsg(pCmd, msg); + setErrMsg(pCmd, msg1); } return isProjectionFunction == true ? TSDB_CODE_INVALID_SQL : TSDB_CODE_SUCCESS; @@ -5164,8 +5183,7 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) { if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { bool queryOnTags = false; - int32_t ret = tscQueryOnlyMetricTags(pCmd, &queryOnTags); - if (ret != TSDB_CODE_SUCCESS) { + if (tscQueryOnlyMetricTags(pCmd, &queryOnTags) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -5382,8 +5400,8 @@ static void doUpdateSqlFunctionForTagPrj(SSqlCmd* pCmd) { } } - int16_t resType = 0; - int16_t resBytes = 0; + int16_t resType = 0; + int16_t resBytes = 0; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); @@ -5464,7 +5482,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) { const char* msg2 = "functions not allowed"; bool tagColExists = false; - int16_t numOfTimestamp = 0; // primary timestamp column + int16_t numOfTimestamp = 0; // primary timestamp column int16_t numOfSelectivity = 0; int16_t numOfAggregation = 0; @@ -5493,7 +5511,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) { // When the tag projection function on tag column that is not in the group by clause, aggregation function and // selectivity function exist in select clause is not allowed. - if(numOfAggregation > 0) { + if (numOfAggregation > 0) { setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } @@ -5598,14 +5616,14 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { const char* msg2 = "interval not allowed in group by normal column"; const char* msg3 = "group by not allowed on projection query"; const char* msg4 = "tags retrieve not compatible with group by"; - const char* msg5 = "retrieve tags not compatible with group by "; + const char* msg5 = "retrieve tags not compatible with group by or interval query"; SSqlCmd* pCmd = &pSql->cmd; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); // only retrieve tags, group by is not supportted if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) { - if (pCmd->groupbyExpr.numOfGroupCols > 0) { + if (pCmd->groupbyExpr.numOfGroupCols > 0 || pCmd->nAggTimeInterval > 0) { setErrMsg(pCmd, msg5); return TSDB_CODE_INVALID_SQL; } else { @@ -5634,7 +5652,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { * group by normal columns. * Check if the column projection is identical to the group by column or not */ - if (functId == TSDB_FUNC_PRJ) { + if (functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { bool qualified = false; for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) { SColIndexEx* pColIndex = &pCmd->groupbyExpr.columnInfo[j]; @@ -5650,7 +5668,8 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { } if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM && - functId != TSDB_FUNC_TAGPRJ) { + functId != TSDB_FUNC_TAGPRJ && + (functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX)) { setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index aa80e505d5f4938a746f8ce4c255fa6f2df7426a..67118b50e2db7994734d7ebb0245b0140f5e21a2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1332,7 +1332,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql) { *((uint64_t *)pMsg) = pSql->res.qhandle; pMsg += sizeof(pSql->res.qhandle); - *pMsg = htons(pSql->cmd.type); + *((uint16_t*)pMsg) = htons(pSql->cmd.type); pMsg += sizeof(pSql->cmd.type); msgLen = pMsg - pStart; @@ -3450,7 +3450,12 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { tscSetResultPointer(pCmd, pRes); pRes->row = 0; - if (pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd) && pRes->offset > 0)) { + /** + * If the query result is exhausted, the connection will be recycled. + * If current query is to free resource at server side, the connection will be recycle. + */ + if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd) && pRes->offset > 0)) || + ((pCmd->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) { taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); pSql->thandle = NULL; } else { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 547171cffeb3639ebe53d78d21704ff914db614e..fd23b183b1065e96ebb82b044219648b2be90e42 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -142,7 +142,6 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) { return false; } - void tscGetDBInfoFromMeterId(char* meterId, char* db) { char* st = strstr(meterId, TS_PATH_DELIMITER); if (st != NULL) { @@ -265,7 +264,7 @@ bool tscIsPointInterpQuery(SSqlCmd* pCmd) { } bool tscIsTWAQuery(SSqlCmd* pCmd) { - for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { + for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); if (pExpr == NULL) { continue; @@ -450,7 +449,8 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock); } -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset) { +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, + uint32_t offset) { uint32_t needed = pDataBlock->numOfParams + 1; if (needed > pDataBlock->numOfAllocedParams) { needed *= 2; @@ -490,13 +490,13 @@ SDataBlockList* tscCreateBlockArrayList() { return pDataBlockArrayList; } -void tscAppendDataBlock(SDataBlockList *pList, STableDataBlocks *pBlocks) { +void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { if (pList->nSize >= pList->nAlloc) { pList->nAlloc = pList->nAlloc << 1; - pList->pData = realloc(pList->pData, sizeof(void *) * (size_t)pList->nAlloc); + pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); // reset allocated memory - memset(pList->pData + pList->nSize, 0, sizeof(void *) * (pList->nAlloc - pList->nSize)); + memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); } pList->pData[pList->nSize++] = pBlocks; @@ -553,7 +553,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { } STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { - STableDataBlocks *dataBuf = tscCreateDataBlock(size); + STableDataBlocks* dataBuf = tscCreateDataBlock(size); dataBuf->rowSize = rowSize; dataBuf->size = startOffset; @@ -573,7 +573,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData } if (dataBuf == NULL) { - dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId); + dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf); } @@ -604,7 +604,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi if (tmp != NULL) { dataBuf->pData = tmp; memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); - } else { // failed to allocate memory, free already allocated memory and return error code + } else { // failed to allocate memory, free already allocated memory and return error code tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize); taosCleanUpIntHash(pVnodeDataBlockHashList); @@ -673,7 +673,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->allocSize = size; } else { if (pCmd->allocSize < size) { - char* b = realloc(pCmd->payload, size); + char* b = realloc(pCmd->payload, size); if (b == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; pCmd->payload = b; pCmd->allocSize = size; @@ -869,11 +869,11 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { static void _exprCheckSpace(SSqlExprInfo* pExprInfo, int32_t size) { if (size > pExprInfo->numOfAlloc) { - int32_t oldSize = pExprInfo->numOfAlloc; + uint32_t oldSize = pExprInfo->numOfAlloc; - int32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1); + uint32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1U); while (newSize < size) { - newSize = (newSize << 1); + newSize = (newSize << 1U); } if (newSize > TSDB_MAX_COLUMNS) { @@ -1161,7 +1161,7 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { assert(pColumnBaseInfo->numOfCols <= TSDB_MAX_COLUMNS); for (int32_t i = 0; i < pColumnBaseInfo->numOfCols; ++i) { - SColumnBase *pColBase = &(pColumnBaseInfo->pColList[i]); + SColumnBase* pColBase = &(pColumnBaseInfo->pColList[i]); if (pColBase->numOfFilters > 0) { for (int32_t j = 0; j < pColBase->numOfFilters; ++j) { @@ -1179,8 +1179,9 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { tfree(pColumnBaseInfo->pColList); } - -void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { _cf_ensureSpace(pColumnBaseInfo, size); } +void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { + _cf_ensureSpace(pColumnBaseInfo, size); +} /* * 1. normal name, not a keyword or number @@ -1228,16 +1229,16 @@ int32_t tscValidateName(SSQLToken* pToken) { int len = tSQLGetToken(pToken->z, &pToken->type); // single token, validate it - if (len == pToken->n){ + if (len == pToken->n) { return validateQuoteToken(pToken); } else { - sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); - if (sep == NULL) { - return TSDB_CODE_INVALID_SQL; - } + sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); + if (sep == NULL) { + return TSDB_CODE_INVALID_SQL; + } return tscValidateName(pToken); - } + } } else { if (isNumber(pToken)) { return TSDB_CODE_INVALID_SQL; @@ -1616,8 +1617,8 @@ int32_t SStringAlloc(SString* pStr, int32_t size) { #ifdef WINDOWS LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR)&lpMsgBuf, 0, NULL); tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); LocalFree(lpMsgBuf); #else @@ -1652,12 +1653,11 @@ int32_t SStringEnsureRemain(SString* pStr, int32_t size) { char* tmp = realloc(pStr->z, newsize); if (tmp == NULL) { - #ifdef WINDOWS LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR)&lpMsgBuf, 0, NULL); tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); LocalFree(lpMsgBuf); #else @@ -1728,7 +1728,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex if (pPrevSql != NULL) { pNew->cmd.type = pPrevSql->cmd.type; } else { - pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery + pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery } uint64_t uid = pMeterMetaInfo->pMeterMeta->uid; @@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex char key[TSDB_MAX_TAGS_LEN + 1] = {0}; tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); - char* name = pMeterMetaInfo->name; + char* name = pMeterMetaInfo->name; SMeterMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { @@ -1768,11 +1768,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex SMetricMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key); pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags, - pMeterMetaInfo->tagColumnIndex); + pMeterMetaInfo->tagColumnIndex); } else { SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0); - pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, pMeterMetaInfo->numOfTags, - pMeterMetaInfo->tagColumnIndex); + pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, + pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); pPrevInfo->pMeterMeta = NULL; pPrevInfo->pMetricMeta = NULL; @@ -1783,13 +1783,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, pNew->cmd.type); + tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, + pNew->cmd.type); return pNew; } void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; - void* fp = pSql->fp; + void* fp = pSql->fp; if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index de4e430a462f851b3b34ff32fc8acc738a40ad8b..4c43171283b5341fd8aff41c3c8c9637803c1f1d 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -221,20 +221,20 @@ enum _syncstatus { #define TSDB_MAX_RPC_THREADS 5 #define TSDB_QUERY_TYPE_QUERY 0 // normal query -#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x1 // free qhandle at vnode +#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01U // free qhandle at vnode /* * 1. ordinary sub query for select * from super_table * 2. all sqlobj generated by createSubqueryObj with this flag */ -#define TSDB_QUERY_TYPE_SUBQUERY 0x2 -#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x4 // two-stage subquery for super table - -#define TSDB_QUERY_TYPE_TABLE_QUERY 0x8 // query ordinary table; below only apply to client side -#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10 // query on super table -#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20 // join query -#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40 // select *,columns... query -#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80 // join sub query at the second stage +#define TSDB_QUERY_TYPE_SUBQUERY 0x02U +#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04U // two-stage subquery for super table + +#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08U // query ordinary table; below only apply to client side +#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10U // query on super table +#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20U // join query +#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40U // select *,columns... query +#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80U // join sub query at the second stage #ifdef __cplusplus } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index f9fe9efa33a3cc6e9073e1acb596bb269afbd90e..47ed92e2f7fd7be1c83f181d2fbcaeb00cc2dbe0 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -179,7 +179,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { fprintf(stderr, "Invalid path %s\n", arg); return -1; } - strcpy(configDir, full_path.we_wordv[0]); + taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); wordfree(&full_path); break; case OPT_ABORT: diff --git a/src/system/detail/src/mgmtUtil.c b/src/system/detail/src/mgmtUtil.c index be90180f681be75396c208ce078c9025c687d286..3be2e1288d225147b95f511c865d0c425c1dee23 100644 --- a/src/system/detail/src/mgmtUtil.c +++ b/src/system/detail/src/mgmtUtil.c @@ -92,5 +92,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { char dbName[TSDB_DB_NAME_LEN + 1] = {0}; extractDBName(db, dbName); - return (strncasecmp(dbName, monitordb, strlen(dbName)) == 0); + size_t len = strlen(dbName); + return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9c0d7883a5242987b46909b98f7d0081ba82fc92..654f9537e4d8a8ec7b3ff2d0ce278ecd3cd38273 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -3122,9 +3122,11 @@ static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || + functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) { continue; } + if (functionId != functId && functionId != functIdDst) { return false; } @@ -3137,10 +3139,9 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } -static void rewriteExecOrder(SQuery *pQuery, bool metricQuery) { +static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { // in case of point-interpolation query, use asc order scan - char msg[] = - "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " + char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " "new qrange:%lld-%lld"; // descending order query @@ -3614,7 +3615,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete } setScanLimitationByResultBuffer(pQuery); - rewriteExecOrder(pQuery, false); + changeExecuteScanOrder(pQuery, false); pQInfo->over = 0; pQInfo->pointsRead = 0; @@ -3790,7 +3791,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQInfo->pointsRead = 0; pQuery->pointsRead = 0; - rewriteExecOrder(pQuery, true); + changeExecuteScanOrder(pQuery, true); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 353b2668ee94c1da31b005f84beb63e5036b6341..bbe2f60274c5e17a884b842322cd9e6ca4f756df 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -385,14 +385,21 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { pRetrieve = (SRetrieveMeterMsg *)pMsg; pRetrieve->free = htons(pRetrieve->free); + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + dTrace("retrieve msg, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free); + } else { + dTrace("retrieve msg to free resource from client, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free); + } + /* * in case of server restart, apps may hold qhandle created by server before restart, * which is actually invalid, therefore, signature check is required. */ if (pRetrieve->qhandle == (uint64_t)pObj->qhandle) { // if free flag is set, client wants to clean the resources - if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { code = vnodeRetrieveQueryInfo((void *)(pRetrieve->qhandle), &numOfRows, &rowSize, &timePrec); + } } else { dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle); code = TSDB_CODE_INVALID_QHANDLE; diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index 790ad1c4a10b5d63987a6f1eeebc184605b773d3..7a2fb5530cef467e7dc52fa3d2d92069d890a9b8 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -289,7 +289,7 @@ SSqlFunctionExpr* vnodeCreateSqlFunctionExpr(SQueryMeterMsg* pQueryMsg, int32_t* return NULL; } - if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY) { + if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pExprs[i].resBytes; } assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes));