From 79a6329db61c3201c7ba182c720f68f7dd7e2b40 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Dec 2020 18:35:40 +0800 Subject: [PATCH] [TD-225]refactor and fix memory leaks if errors occur. --- src/client/src/tscFunctionImpl.c | 539 ++++++++++++++++--------------- src/query/inc/qExecutor.h | 2 + src/query/inc/tsqlfunction.h | 4 +- src/query/src/qExecutor.c | 123 +++---- 4 files changed, 327 insertions(+), 341 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index f4b244b18a..3ff5955610 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -26,8 +26,11 @@ #include "tsqlfunction.h" #include "ttype.h" -#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) -#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) +#define GET_INPUT_DATA_LIST(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) +#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) + +#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset])) +#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) #define GET_TRUE_DATA_TYPE() \ int32_t type = 0; \ @@ -58,7 +61,7 @@ } \ aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ } \ - } while (0); + } while (0) #define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ do { \ @@ -391,7 +394,7 @@ static void count_function(SQLFunctionCtx *pCtx) { } else { if (pCtx->hasNull) { for (int32_t i = 0; i < pCtx->size; ++i) { - char *val = GET_INPUT_CHAR_INDEX(pCtx, i); + char *val = GET_INPUT_DATA(pCtx, i); if (isNull(val, pCtx->inputType)) { continue; } @@ -416,7 +419,7 @@ static void count_function(SQLFunctionCtx *pCtx) { } static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -430,7 +433,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void count_func_merge(SQLFunctionCtx *pCtx) { - int64_t *pData = (int64_t *)GET_INPUT_CHAR(pCtx); + int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx); for (int32_t i = 0; i < pCtx->size; ++i) { *((int64_t *)pCtx->aOutputBuf) += pData[i]; } @@ -494,7 +497,7 @@ int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \ continue; \ } \ - TSKEY key = (ctx)->ptsList[i]; \ + TSKEY key = GET_TS_DATA(ctx, i); \ UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \ } @@ -521,7 +524,7 @@ static void do_sum(SQLFunctionCtx *pCtx) { *retVal += GET_DOUBLE_VAL((const char*)&(pCtx->preAggVals.statis.sum)); } } else { // computing based on the true data block - void *pData = GET_INPUT_CHAR(pCtx); + void *pData = GET_INPUT_DATA_LIST(pCtx); notNullElems = 0; if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { @@ -554,7 +557,7 @@ static void do_sum(SQLFunctionCtx *pCtx) { } static void do_sum_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -611,7 +614,7 @@ static int32_t sum_merge_impl(const SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); for (int32_t i = 0; i < pCtx->size; ++i) { - char * input = GET_INPUT_CHAR_INDEX(pCtx, i); + char * input = GET_INPUT_DATA(pCtx, i); SSumInfo *pInput = (SSumInfo *)input; if (pInput->hasResult != DATA_SET_FLAG) { continue; @@ -760,7 +763,7 @@ static void avg_function(SQLFunctionCtx *pCtx) { *pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum)); } } else { - void *pData = GET_INPUT_CHAR(pCtx); + void *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); @@ -795,7 +798,7 @@ static void avg_function(SQLFunctionCtx *pCtx) { } static void avg_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -838,7 +841,7 @@ static void avg_func_merge(SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); - char * input = GET_INPUT_CHAR(pCtx); + char * input = GET_INPUT_DATA_LIST(pCtx); for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { SAvgInfo *pInput = (SAvgInfo *)input; @@ -861,7 +864,7 @@ static void avg_func_second_merge(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); double *sum = (double*) pCtx->aOutputBuf; - char * input = GET_INPUT_CHAR(pCtx); + char * input = GET_INPUT_DATA_LIST(pCtx); for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { SAvgInfo *pInput = (SAvgInfo *)input; @@ -943,7 +946,8 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, if (index < 0 || index >= pCtx->size + pCtx->startOffset) { index = 0; } - + + // the index is the original position, not the relative position key = pCtx->ptsList[index]; } @@ -994,7 +998,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, return; } - void *p = GET_INPUT_CHAR(pCtx); + void *p = GET_INPUT_DATA_LIST(pCtx); + TSKEY *tsList = GET_TS_LIST(pCtx); + *notNullElems = 0; if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { @@ -1013,7 +1019,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, if ((*retVal < pData[i]) ^ isMin) { *retVal = pData[i]; - TSKEY k = pCtx->ptsList[i]; + TSKEY k = tsList[i]; DO_UPDATE_TAG_COLUMNS(pCtx, k); } @@ -1144,7 +1150,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp assert(pCtx->stableQuery); for (int32_t i = 0; i < pCtx->size; ++i) { - char *input = GET_INPUT_CHAR_INDEX(pCtx, i); + char *input = GET_INPUT_DATA(pCtx, i); if (input[bytes] != DATA_SET_FLAG) { continue; } @@ -1241,8 +1247,8 @@ static void max_func_second_merge(SQLFunctionCtx *pCtx) { } static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); - TSKEY key = pCtx->ptsList[index]; + char *pData = GET_INPUT_DATA(pCtx, index); + TSKEY key = GET_TS_DATA(pCtx, index); int32_t num = 0; if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { @@ -1281,7 +1287,7 @@ static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin } static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1297,7 +1303,7 @@ static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1330,7 +1336,7 @@ static void stddev_function(SQLFunctionCtx *pCtx) { double *retVal = &pStd->res; double avg = pStd->avg; - void *pData = GET_INPUT_CHAR(pCtx); + void *pData = GET_INPUT_DATA_LIST(pCtx); switch (pCtx->inputType) { case TSDB_DATA_TYPE_INT: { @@ -1381,7 +1387,7 @@ static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) { avg_function_f(pCtx, index); } else { double avg = pStd->avg; - void * pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void * pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; @@ -1487,14 +1493,14 @@ static void first_function(SQLFunctionCtx *pCtx) { // handle the null value for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes); - TSKEY k = pCtx->ptsList[i]; + TSKEY k = GET_TS_DATA(pCtx, i); DO_UPDATE_TAG_COLUMNS(pCtx, k); SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); @@ -1513,7 +1519,7 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1521,7 +1527,7 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); - TSKEY ts = pCtx->ptsList[index]; + TSKEY ts = GET_TS_DATA(pCtx, index); DO_UPDATE_TAG_COLUMNS(pCtx, ts); SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); @@ -1530,7 +1536,7 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t index) { - int64_t *timestamp = pCtx->ptsList; + int64_t *timestamp = GET_TS_LIST(pCtx); SFirstLastInfo *pInfo = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes); @@ -1561,7 +1567,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { // find the first not null value for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } @@ -1579,7 +1585,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { } static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1594,7 +1600,7 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void first_dist_func_merge(SQLFunctionCtx *pCtx) { - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); assert(pCtx->size == 1 && pCtx->stableQuery); @@ -1613,7 +1619,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); - char * pData = GET_INPUT_CHAR(pCtx); + char * pData = GET_INPUT_DATA_LIST(pCtx); SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); if (pInput->hasResult != DATA_SET_FLAG) { return; @@ -1648,7 +1654,7 @@ static void last_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; for (int32_t i = pCtx->size - 1; i >= 0; --i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (!pCtx->requireNull) { continue; @@ -1656,7 +1662,7 @@ static void last_function(SQLFunctionCtx *pCtx) { } memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes); - TSKEY ts = pCtx->ptsList[i]; + TSKEY ts = GET_TS_DATA(pCtx, i); DO_UPDATE_TAG_COLUMNS(pCtx, ts); SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); @@ -1671,7 +1677,7 @@ static void last_function(SQLFunctionCtx *pCtx) { } static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1685,7 +1691,7 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); - TSKEY ts = pCtx->ptsList[index]; + TSKEY ts = GET_TS_DATA(pCtx, index); DO_UPDATE_TAG_COLUMNS(pCtx, ts); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -1693,7 +1699,7 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { pResInfo->complete = true; // set query completed } else { // in case of ascending order check, all data needs to be checked SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); - TSKEY ts = pCtx->ptsList[index]; + TSKEY ts = GET_TS_DATA(pCtx, index); char* buf = GET_ROWCELL_INTERBUF(pResInfo); if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { @@ -1707,7 +1713,7 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t index) { - int64_t *timestamp = pCtx->ptsList; + int64_t *timestamp = GET_TS_LIST(pCtx); SFirstLastInfo *pInfo = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes); @@ -1741,7 +1747,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; for (int32_t i = pCtx->size - 1; i >= 0; --i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (!pCtx->requireNull) { continue; @@ -1765,7 +1771,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -1784,7 +1790,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void last_dist_func_merge(SQLFunctionCtx *pCtx) { - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); assert(pCtx->size == 1 && pCtx->stableQuery); @@ -1808,7 +1814,7 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) { * is: the output data format in computing */ static void last_dist_func_second_merge(SQLFunctionCtx *pCtx) { - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); if (pInput->hasResult != DATA_SET_FLAG) { @@ -1837,7 +1843,7 @@ static void last_dist_func_second_merge(SQLFunctionCtx *pCtx) { */ static void last_row_function(SQLFunctionCtx *pCtx) { assert(pCtx->size >= 1); - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); // assign the last element in current data block assignVal(pCtx->aOutputBuf, pData + (pCtx->size - 1) * pCtx->inputBytes, pCtx->inputBytes, pCtx->inputType); @@ -1848,12 +1854,13 @@ static void last_row_function(SQLFunctionCtx *pCtx) { // set the result to final result buffer in case of super table query if (pCtx->stableQuery) { SLastrowInfo *pInfo1 = (SLastrowInfo *)(pCtx->aOutputBuf + pCtx->inputBytes); - pInfo1->ts = pCtx->ptsList[pCtx->size - 1]; + pInfo1->ts = GET_TS_DATA(pCtx, pCtx->size - 1); pInfo1->hasResult = DATA_SET_FLAG; DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts); } else { - DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[pCtx->size - 1]); + TSKEY ts = GET_TS_DATA(pCtx, pCtx->size - 1); + DO_UPDATE_TAG_COLUMNS(pCtx, ts); } SET_VAL(pCtx, pCtx->size, 1); @@ -2245,13 +2252,15 @@ static void top_function(SQLFunctionCtx *pCtx) { assert(pRes->num >= 0); for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); + TSKEY ts = GET_TS_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } notNullElems++; - do_top_function_add(pRes, (int32_t)pCtx->param[0].i64Key, data, pCtx->ptsList[i], pCtx->inputType, &pCtx->tagInfo, NULL, 0); + do_top_function_add(pRes, (int32_t)pCtx->param[0].i64Key, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2268,7 +2277,7 @@ static void top_function(SQLFunctionCtx *pCtx) { } static void top_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -2277,15 +2286,16 @@ static void top_function_f(SQLFunctionCtx *pCtx, int32_t index) { assert(pRes->num >= 0); SET_VAL(pCtx, 1, 1); - do_top_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, pCtx->ptsList[index], pCtx->inputType, &pCtx->tagInfo, NULL, - 0); + TSKEY ts = GET_TS_DATA(pCtx, index); + + do_top_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } static void top_func_merge(SQLFunctionCtx *pCtx) { - char *input = GET_INPUT_CHAR(pCtx); + char *input = GET_INPUT_DATA_LIST(pCtx); STopBotInfo *pInput = (STopBotInfo *)input; if (pInput->num <= 0) { @@ -2306,7 +2316,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { } static void top_func_second_merge(SQLFunctionCtx *pCtx) { - STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_CHAR(pCtx); + STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx); // construct the input data struct from binary data buildTopBotStruct(pInput, pCtx); @@ -2334,14 +2344,15 @@ static void bottom_function(SQLFunctionCtx *pCtx) { STopBotInfo *pRes = getTopBotOutputInfo(pCtx); for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); + TSKEY ts = GET_TS_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } notNullElems++; - do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64Key, data, pCtx->ptsList[i], pCtx->inputType, &pCtx->tagInfo, NULL, - 0); + do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64Key, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2358,22 +2369,23 @@ static void bottom_function(SQLFunctionCtx *pCtx) { } static void bottom_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); + TSKEY ts = GET_TS_DATA(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } STopBotInfo *pRes = getTopBotOutputInfo(pCtx); SET_VAL(pCtx, 1, 1); - do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, pCtx->ptsList[index], pCtx->inputType, &pCtx->tagInfo, - NULL, 0); + do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } static void bottom_func_merge(SQLFunctionCtx *pCtx) { - char *input = GET_INPUT_CHAR(pCtx); + char *input = GET_INPUT_DATA_LIST(pCtx); STopBotInfo *pInput = (STopBotInfo *)input; if (pInput->num <= 0) { @@ -2394,7 +2406,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { } static void bottom_func_second_merge(SQLFunctionCtx *pCtx) { - STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_CHAR(pCtx); + STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx); // construct the input data struct from binary data buildTopBotStruct(pInput, pCtx); @@ -2490,7 +2502,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); } else { for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } @@ -2515,7 +2527,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { // the second stage, calculate the true percentile value for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } @@ -2529,7 +2541,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { } static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -2637,7 +2649,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { assert(pInfo->pHisto->elems != NULL); for (int32_t i = 0; i < pCtx->size; ++i) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + char *data = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } @@ -2661,7 +2673,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { } static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -2682,7 +2694,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); assert(pCtx->stableQuery); - SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_CHAR(pCtx); + SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo)); pInput->pHisto->elems = (SHistBin*) ((char *)pInput->pHisto + sizeof(SHistogramInfo)); @@ -2714,7 +2726,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { } static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) { - SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_CHAR(pCtx); + SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo)); pInput->pHisto->elems = (SHistBin*) ((char *)pInput->pHisto + sizeof(SHistogramInfo)); @@ -2817,7 +2829,7 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) { double(*param)[3] = pInfo->mat; double x = pInfo->startVal; - void *pData = GET_INPUT_CHAR(pCtx); + void *pData = GET_INPUT_DATA_LIST(pCtx); int32_t numOfElem = 0; switch (pCtx->inputType) { @@ -2877,7 +2889,7 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) { } static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -2979,7 +2991,7 @@ static void col_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->order == TSDB_ORDER_ASC) { memcpy(pCtx->aOutputBuf, pData, (size_t) pCtx->size * pCtx->inputBytes); } else { @@ -3004,7 +3016,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { } INC_INIT_VAL(pCtx, 1); - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); pCtx->aOutputBuf += pCtx->inputBytes; @@ -3058,7 +3070,7 @@ static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void copy_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); } @@ -3078,7 +3090,7 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx) { // TODO difference in date column static void diff_function(SQLFunctionCtx *pCtx) { - void *data = GET_INPUT_CHAR(pCtx); + void *data = GET_INPUT_DATA_LIST(pCtx); bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED); int32_t notNullElems = 0; @@ -3086,8 +3098,9 @@ static void diff_function(SQLFunctionCtx *pCtx) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - TSKEY * pTimestamp = pCtx->ptsOutputBuf; - + TSKEY* pTimestamp = pCtx->ptsOutputBuf; + TSKEY* tsList = GET_TS_LIST(pCtx); + switch (pCtx->inputType) { case TSDB_DATA_TYPE_INT: { int32_t *pData = (int32_t *)data; @@ -3103,13 +3116,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64Key); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64Key); // direct previous may be null - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3135,13 +3148,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = pData[i] - pCtx->param[1].i64Key; - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = pData[i] - pCtx->param[1].i64Key; - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3167,12 +3180,12 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = pData[i] - pCtx->param[1].dKey; - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = pData[i] - pCtx->param[1].dKey; - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } @@ -3197,13 +3210,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (float)(pData[i] - pCtx->param[1].dKey); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = (float)(pData[i] - pCtx->param[1].dKey); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3230,12 +3243,12 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64Key); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64Key); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3261,13 +3274,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64Key); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } else { *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64Key); - *pTimestamp = pCtx->ptsList[i]; + *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3308,12 +3321,12 @@ static void diff_function(SQLFunctionCtx *pCtx) { } else { \ *(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \ *(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = (ctx)->ptsList[index]; \ + *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ } \ } while (0); static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -3333,7 +3346,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { } else { *(int32_t *)pCtx->aOutputBuf = *(int32_t *)pData - (int32_t)pCtx->param[1].i64Key; pCtx->param[1].i64Key = *(int32_t *)pData; - *(int64_t *)pCtx->ptsOutputBuf = pCtx->ptsList[index]; + *(int64_t *)pCtx->ptsOutputBuf = GET_TS_DATA(pCtx, index); } break; }; @@ -3477,7 +3490,7 @@ static void spread_function(SQLFunctionCtx *pCtx) { goto _spread_over; } - void *pData = GET_INPUT_CHAR(pCtx); + void *pData = GET_INPUT_DATA_LIST(pCtx); numOfElems = 0; if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { @@ -3513,7 +3526,7 @@ static void spread_function(SQLFunctionCtx *pCtx) { } static void spread_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -3563,7 +3576,7 @@ void spread_func_merge(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; for (int32_t i = 0; i < pCtx->size; ++i) { - SSpreadInfo *input = (SSpreadInfo *)GET_INPUT_CHAR_INDEX(pCtx, i); + SSpreadInfo *input = (SSpreadInfo *)GET_INPUT_DATA(pCtx, i); /* no assign tag, the value is null */ if (input->hasResult != DATA_SET_FLAG) { @@ -3593,7 +3606,7 @@ void spread_func_merge(SQLFunctionCtx *pCtx) { * the final result is generated in spread_function_finalizer */ void spread_func_sec_merge(SQLFunctionCtx *pCtx) { - SSpreadInfo *pData = (SSpreadInfo *)GET_INPUT_CHAR(pCtx); + SSpreadInfo *pData = (SSpreadInfo *)GET_INPUT_DATA_LIST(pCtx); if (pData->hasResult != DATA_SET_FLAG) { return; } @@ -3671,26 +3684,25 @@ static double twa_get_area(SPoint1 s, SPoint1 e) { return val; } -static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t index, int32_t size) { +static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) { int32_t notNullElems = 0; - TSKEY *primaryKey = pCtx->ptsList; - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + TSKEY *tsList = GET_TS_LIST(pCtx); int32_t i = index; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); SPoint1* last = &pInfo->p; if (pCtx->start.key != INT64_MIN) { - assert((pCtx->start.key < primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_ASC) || - (pCtx->start.key > primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_DESC)); + assert((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || + (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC)); assert(last->key == INT64_MIN); - last->key = primaryKey[tsIndex + i]; - GET_TYPED_DATA(last->val, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); + last->key = tsList[i]; + GET_TYPED_DATA(last->val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index)); pInfo->dOutput += twa_get_area(pCtx->start, *last); @@ -3699,8 +3711,8 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t notNullElems++; i += step; } else if (pInfo->p.key == INT64_MIN) { - last->key = primaryKey[tsIndex + i]; - GET_TYPED_DATA(last->val, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); + last->key = tsList[i]; + GET_TYPED_DATA(last->val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index)); pInfo->hasResult = DATA_SET_FLAG; pInfo->win.skey = last->key; @@ -3711,78 +3723,78 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t // calculate the value of switch(pCtx->inputType) { case TSDB_DATA_TYPE_TINYINT: { - int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + int8_t *val = (int8_t*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = val[i]}; + SPoint1 st = {.key = tsList[i], .val = val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } case TSDB_DATA_TYPE_SMALLINT: { - int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + int16_t *val = (int16_t*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = val[i]}; + SPoint1 st = {.key = tsList[i], .val = val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } case TSDB_DATA_TYPE_INT: { - int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + int32_t *val = (int32_t*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = val[i]}; + SPoint1 st = {.key = tsList[i], .val = val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } case TSDB_DATA_TYPE_BIGINT: { - int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + int64_t *val = (int64_t*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = (double) val[i]}; + SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } case TSDB_DATA_TYPE_FLOAT: { - float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, 0); + float *val = (float*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = val[i]}; + SPoint1 st = {.key = tsList[i], .val = val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } case TSDB_DATA_TYPE_DOUBLE: { - double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, 0); + double *val = (double*) GET_INPUT_DATA(pCtx, 0); for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - SPoint1 st = {.key = primaryKey[i + tsIndex], .val = val[i]}; + SPoint1 st = {.key = tsList[i], .val = val[i]}; pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -3802,7 +3814,7 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t } static void twa_function(SQLFunctionCtx *pCtx) { - void *data = GET_INPUT_CHAR(pCtx); + void *data = GET_INPUT_DATA_LIST(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -3816,7 +3828,7 @@ static void twa_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; if (i >= 0 && i < pCtx->size) { - notNullElems = twa_function_impl(pCtx, pCtx->startOffset, i, pCtx->size); + notNullElems = twa_function_impl(pCtx, i, pCtx->size); } SET_VAL(pCtx, notNullElems, 1); @@ -3830,134 +3842,133 @@ static void twa_function(SQLFunctionCtx *pCtx) { } } -//TODO refactor static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - int32_t notNullElems = 0; - TSKEY *primaryKey = pCtx->ptsList; - + int32_t notNullElems = twa_function_impl(pCtx, index, 1); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - int32_t i = pCtx->startOffset; - int32_t size = pCtx->size; - - if (pCtx->start.key != INT64_MIN) { - assert(pInfo->p.key == INT64_MIN); - - pInfo->p.key = primaryKey[index]; - GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); - - pInfo->dOutput += twa_get_area(pCtx->start, pInfo->p); - - pInfo->hasResult = DATA_SET_FLAG; - pInfo->win.skey = pCtx->start.key; - notNullElems++; - i += 1; - } else if (pInfo->p.key == INT64_MIN) { - pInfo->p.key = primaryKey[index]; - GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); - - pInfo->hasResult = DATA_SET_FLAG; - pInfo->win.skey = pInfo->p.key; - notNullElems++; - i += 1; - } - - // calculate the value of - switch(pCtx->inputType) { - case TSDB_DATA_TYPE_TINYINT: { - int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; - } - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; - } - break; - } - case TSDB_DATA_TYPE_INT: { - int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; - } - break; - } - case TSDB_DATA_TYPE_BIGINT: { - int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = (double) val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; - } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st);//((val[i] + pInfo->p.val) / 2) * (primaryKey[i + index] - pInfo->p.key); - pInfo->p = st; - } - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { - if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { - continue; - } - - SPoint1 st = {.key = primaryKey[i + index], .val = val[i]}; - pInfo->dOutput += twa_get_area(pInfo->p, st);//((val[i] + pInfo->p.val) / 2) * (primaryKey[i + index] - pInfo->p.key); - pInfo->p = st; - } - break; - } - default: assert(0); - } - - // the last interpolated time window value - if (pCtx->end.key != INT64_MIN) { - pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);//((pInfo->p.val + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->p.key); - pInfo->p = pCtx->end; - } - - pInfo->win.ekey = pInfo->p.key; +// TSKEY *primaryKey = GET_TS_LIST(pCtx); +// +// +// STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); +// int32_t i = pCtx->startOffset; +// int32_t size = pCtx->size; +// +// if (pCtx->start.key != INT64_MIN) { +// assert(pInfo->p.key == INT64_MIN); +// +// pInfo->p.key = primaryKey[index]; +// GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index)); +// +// pInfo->dOutput += twa_get_area(pCtx->start, pInfo->p); +// +// pInfo->hasResult = DATA_SET_FLAG; +// pInfo->win.skey = pCtx->start.key; +// notNullElems++; +// i += 1; +// } else if (pInfo->p.key == INT64_MIN) { +// pInfo->p.key = primaryKey[index]; +// GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index)); +// +// pInfo->hasResult = DATA_SET_FLAG; +// pInfo->win.skey = pInfo->p.key; +// notNullElems++; +// i += 1; +// } +// +// // calculate the value of +// switch(pCtx->inputType) { +// case TSDB_DATA_TYPE_TINYINT: { +// int8_t *val = (int8_t*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// case TSDB_DATA_TYPE_SMALLINT: { +// int16_t *val = (int16_t*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// case TSDB_DATA_TYPE_INT: { +// int32_t *val = (int32_t*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// case TSDB_DATA_TYPE_BIGINT: { +// int64_t *val = (int64_t*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = (double) val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// case TSDB_DATA_TYPE_FLOAT: { +// float *val = (float*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// case TSDB_DATA_TYPE_DOUBLE: { +// double *val = (double*) GET_INPUT_DATA(pCtx, index); +// for (; i < size; i++) { +// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { +// continue; +// } +// +// SPoint1 st = {.key = i, .val = val[i]}; +// pInfo->dOutput += twa_get_area(pInfo->p, st); +// pInfo->p = st; +// } +// break; +// } +// default: assert(0); +// } +// +// // the last interpolated time window value +// if (pCtx->end.key != INT64_MIN) { +// pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);//((pInfo->p.val + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->p.key); +// pInfo->p = pCtx->end; +// } +// +// pInfo->win.ekey = pInfo->p.key; SET_VAL(pCtx, notNullElems, 1); @@ -4040,8 +4051,10 @@ static void interp_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SInterpInfoDetail* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + assert(pCtx->startOffset == 0); + if (pCtx->size == 1) { - char *pData = GET_INPUT_CHAR(pCtx); + char *pData = GET_INPUT_DATA_LIST(pCtx); assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); } else { /* @@ -4067,13 +4080,13 @@ static void interp_function(SQLFunctionCtx *pCtx) { } else if (pInfo->type == TSDB_FILL_SET_VALUE) { tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType, true); } else if (pInfo->type == TSDB_FILL_PREV) { - char *data = GET_INPUT_CHAR_INDEX(pCtx, 0); + char *data = GET_INPUT_DATA(pCtx, 0); assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType); SET_VAL(pCtx, pCtx->size, 1); } else if (pInfo->type == TSDB_FILL_LINEAR) { - char *data1 = GET_INPUT_CHAR_INDEX(pCtx, 0); - char *data2 = GET_INPUT_CHAR_INDEX(pCtx, 1); + char *data1 = GET_INPUT_DATA(pCtx, 0); + char *data2 = GET_INPUT_DATA(pCtx, 1); TSKEY key1 = pCtx->ptsList[0]; TSKEY key2 = pCtx->ptsList[1]; @@ -4135,14 +4148,14 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STSBuf * pTSbuf = ((STSCompInfo *)(GET_ROWCELL_INTERBUF(pResInfo)))->pTSBuf; - const char *input = GET_INPUT_CHAR(pCtx); + const char *input = GET_INPUT_DATA_LIST(pCtx); // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { - char *d = GET_INPUT_CHAR_INDEX(pCtx, i); + char *d = GET_INPUT_DATA(pCtx, i); tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); } } @@ -4152,7 +4165,7 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { } static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -4238,16 +4251,16 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) { static void rate_function(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - int32_t notNullElems = 0; - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = pCtx->ptsList; + int32_t notNullElems = 0; + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); + TSKEY *primaryKey = GET_TS_LIST(pCtx); tscDebug("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); for (int32_t i = 0; i < pCtx->size; ++i) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + char *pData = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { tscDebug("%p rate_function() index of null data:%d", pCtx, i); continue; @@ -4295,7 +4308,7 @@ static void rate_function(SQLFunctionCtx *pCtx) { } static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } @@ -4303,8 +4316,8 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { // NOTE: keep the intermediate result into the interResultBuf SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = pCtx->ptsList; - + TSKEY *primaryKey = GET_TS_LIST(pCtx); + int64_t v = 0; GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData); @@ -4415,8 +4428,8 @@ static void irate_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = pCtx->ptsList; - + TSKEY *primaryKey = GET_TS_LIST(pCtx); + tscDebug("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); if (pCtx->size < 1) { @@ -4424,7 +4437,7 @@ static void irate_function(SQLFunctionCtx *pCtx) { } for (int32_t i = pCtx->size - 1; i >= 0; --i) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + char *pData = GET_INPUT_DATA(pCtx, i); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { tscDebug("%p irate_function() index of null data:%d", pCtx, i); continue; @@ -4467,15 +4480,15 @@ static void irate_function(SQLFunctionCtx *pCtx) { } static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + void *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } // NOTE: keep the intermediate result into the interResultBuf - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = pCtx->ptsList; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); + TSKEY *primaryKey = GET_TS_LIST(pCtx); int64_t v = 0; GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData); @@ -4505,7 +4518,7 @@ static void do_sumrate_merge(SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - char * input = GET_INPUT_CHAR(pCtx); + char * input = GET_INPUT_DATA_LIST(pCtx); for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { SRateInfo *pInput = (SRateInfo *)input; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 201b3b2abc..e41217af6e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -204,6 +204,8 @@ typedef struct SQueryRuntimeEnv { int32_t* rowCellInfoOffset;// offset value for each row result cell info char** prevRow; char** nextRow; + + SArithmeticSupport *sasArray; } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 38bb0b8a71..b76be3d0fe 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -191,8 +191,8 @@ typedef struct SQLFunctionCtx { int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block int32_t numOfParams; tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */ - int64_t * ptsList; // corresponding timestamp array list - void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + int64_t *ptsList; // corresponding timestamp array list + void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SQLPreAggVal preAggVals; tVariant tag; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 606d035898..6c9a66e3a8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -825,9 +825,6 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); int32_t functionId = pQuery->pExpr1[k].base.functionId; - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx[k].ptsList = &tsCol[pCtx[k].startOffset]; - } // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here @@ -965,8 +962,7 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { return NULL; } -static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, - SArray *pDataBlock) { +static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { if (pDataBlock == NULL) { return NULL; } @@ -977,15 +973,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas int32_t functionId = pQuery->pExpr1[col].base.functionId; if (functionId == TSDB_FUNC_ARITHM) { sas->pArithExpr = &pQuery->pExpr1[col]; - - sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1); - sas->colList = pQuery->colList; - sas->numOfCols = pQuery->numOfCols; - sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); - - if (sas->data == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } + sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1); + sas->colList = pQuery->colList; + sas->numOfCols = pQuery->numOfCols; // here the pQuery->colList and sas->colList are identical int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock); @@ -1177,15 +1167,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * tsCols = (TSKEY *)(pColInfo->pData); } - SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); - if (sasArray == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); + char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pRuntimeEnv->sasArray[k], k, pQInfo->vgId); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1280,23 +1265,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex); } - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { - continue; - } - - tfree(sasArray[i].data); - } - - tfree(sasArray); } static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { - if (isNull(pData, type)) { // ignore the null value - return -1; - } - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // not assign result buffer yet, add new result buffer, TODO remove it @@ -1308,8 +1279,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); - - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + return -1; } SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, groupIndex); @@ -1539,11 +1509,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol; - SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); - if (sasArray == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - int16_t type = 0; int16_t bytes = 0; @@ -1554,8 +1519,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); + char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pRuntimeEnv->sasArray[k], k, pQInfo->vgId); pCtx[k].size = 1; } @@ -1640,6 +1605,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } doRowwiseApplyFunctions(pRuntimeEnv, &win, offset); + int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; while (1) { @@ -1663,14 +1629,19 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset); } + // restore the index, add the result row will move the index + pWindowResInfo->curIndex = index; } else { // other queries // decide which group this rows belongs to according to current state value if (groupbyColumnValue) { char *val = groupbyColumnData + bytes * offset; + if (isNull(val, type)) { // ignore the null value + continue; + } int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - continue; + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } } @@ -1695,13 +1666,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } _end: - assert(offset >= 0); - assert(tsCols != NULL); - - if (tsCols != NULL) { + assert(offset >= 0 && tsCols != NULL); + if (prevTs != INT64_MIN) { + assert(prevRowIndex >= 0); item->lastKey = prevTs + step; - } else { - item->lastKey = (QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey) + step; } // In case of all rows in current block are not qualified @@ -1712,17 +1680,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (pRuntimeEnv->pTsBuf != NULL) { item->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); } - - // todo refactor: extract method - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { - continue; - } - - tfree(sasArray[i].data); - } - - free(sasArray); } static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, @@ -1802,7 +1759,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY uint32_t status = aAggs[functionId].nStatus; if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { - pCtx->ptsList = &tsCol[pCtx->startOffset]; + pCtx->ptsList = tsCol; } if (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) { @@ -1920,8 +1877,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t)); pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); + pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); - if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { + if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL) { goto _clean; } @@ -1997,11 +1955,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->param[1].i64Key = pQuery->order.orderColId; } + if (functionId == TSDB_FUNC_ARITHM) { + pRuntimeEnv->sasArray[i].data = calloc(pQuery->numOfCols, POINTER_BYTES); + if (pRuntimeEnv->sasArray[i].data == NULL) { + goto _clean; + } + } + if (i > 0) { pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes; pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes; } - } *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; @@ -2023,6 +1987,7 @@ _clean: tfree(pRuntimeEnv->pCtx); tfree(pRuntimeEnv->offset); tfree(pRuntimeEnv->rowCellInfoOffset); + tfree(pRuntimeEnv->sasArray); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -2066,6 +2031,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pCtx); } + if (pRuntimeEnv->sasArray != NULL) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + tfree(pRuntimeEnv->sasArray[i].data); + } + + tfree(pRuntimeEnv->sasArray); + } + pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo); destroyResultBuf(pRuntimeEnv->pResultBuf); @@ -3186,8 +3159,8 @@ int32_t mergeGroupResult(SQInfo *pQInfo) { SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); int32_t ret = mergeIntoGroupResultImpl(pGroupResInfo, group, pQInfo); - if (ret < 0) { - return -1; + if (ret != TSDB_CODE_SUCCESS) { + return ret; } // this group generates at least one result, return results @@ -3282,7 +3255,7 @@ int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableLis posList = calloc(size, sizeof(int32_t)); pTableQueryInfoList = malloc(POINTER_BYTES * size); - if (pTableQueryInfoList == NULL || posList == NULL) { + if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL) { qError("QInfo:%p failed alloc memory", pQInfo); code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _end; @@ -3372,10 +3345,6 @@ int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableLis tfree(posList); tfree(pTree); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, code); - } - return code; } @@ -5547,7 +5516,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { static void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; + SQuery *pQuery = pRuntimeEnv->pQuery; if (pQInfo->groupIndex > 0) { /* @@ -5600,12 +5569,15 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { - if (mergeGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { + int32_t code = mergeGroupResult(pQInfo); + if (code == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num); #endif + } else { // set the error code + pQInfo->code = code; } } else { // not a interval query copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); @@ -5615,7 +5587,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } - static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { SArithmeticSupport *pSupport = (SArithmeticSupport *) param; SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList; @@ -5905,7 +5876,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { multiTableQueryProcess(pQInfo); } else { assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || - isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol); + pRuntimeEnv->groupbyNormalCol); sequentialTableProcess(pQInfo); } -- GitLab