diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 5baa66a9e0229f35c431cea7a0d2dbb9e2ffb0e2..edd83ad07178e45a4fa577779cf4b62aa39f6443 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -66,7 +66,7 @@ typedef struct SLocalReducer { SFillInfo* pFillInfo; // interpolation support structure char * pFinalRes; // result data after interpo tFilePage * discardData; - SResultInfo * pResInfo; + SResultRowCellInfo * pResInfo; bool discard; int32_t offset; // limit offset value bool orderPrjOnSTable; // projection query on stable diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 12d3b7dfd38e09b30aed6b8e66e56e7eead61034..b22c178466288bedc4ab4ec4a977cb5c1fbfae3c 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -167,7 +167,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *interBytes = *bytes + sizeof(SResultInfo); + *interBytes = 0;//*bytes; return TSDB_CODE_SUCCESS; } @@ -175,21 +175,21 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct *type = TSDB_DATA_TYPE_BINARY; *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); - *interBytes = *bytes; + *interBytes = 0;//*bytes; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_COUNT) { *type = TSDB_DATA_TYPE_BIGINT; *bytes = sizeof(int64_t); - *interBytes = *bytes; + *interBytes = 0;//*bytes; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_ARITHM) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *interBytes = *bytes; + *interBytes = 0;//*bytes; return TSDB_CODE_SUCCESS; } @@ -298,7 +298,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *interBytes = dataBytes + sizeof(SResultInfo); + *interBytes = dataBytes; } else if (functionId == TSDB_FUNC_SPREAD) { *type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); @@ -310,7 +310,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } else if (functionId == TSDB_FUNC_LEASTSQR) { *type = TSDB_DATA_TYPE_BINARY; *bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string - *interBytes = *bytes + sizeof(SResultInfo); + *interBytes = *bytes; } else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) { *type = TSDB_DATA_TYPE_BINARY; *bytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo)); @@ -334,28 +334,25 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } -void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) { - assert(pResInfo->interResultBuf == NULL); - - pResInfo->bufLen = size; - pResInfo->superTableQ = superTable; - pResInfo->interResultBuf = buf; -} +//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf) { +// assert(GET_ROWCELL_INTERBUF(pResInfo) == NULL); +// GET_ROWCELL_INTERBUF(pResInfo) = buf; +//} // set the query flag to denote that query is completed static void no_next_step(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->complete = true; } static bool function_setup(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->initialized) { return false; } memset(pCtx->aOutputBuf, 0, (size_t)pCtx->outputBytes); - initResultInfo(pResInfo); + initResultInfo(pResInfo, pCtx->interBufBytes); return true; } @@ -367,7 +364,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) { * @param pCtx */ static void function_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pCtx->aOutputBuf, pCtx->outputType); @@ -431,7 +428,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { *((int64_t *)pCtx->aOutputBuf) += 1; // do not need it actually - SResultInfo *pInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); pInfo->hasResult = DATA_SET_FLAG; } @@ -592,8 +589,8 @@ static void sum_function(SQLFunctionCtx *pCtx) { do_sum(pCtx); // keep the result data in output buffer, not in the intermediate buffer - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { // set the flag for super table query SSumInfo *pSum = (SSumInfo *)pCtx->aOutputBuf; pSum->hasResult = DATA_SET_FLAG; @@ -604,8 +601,8 @@ static void sum_function_f(SQLFunctionCtx *pCtx, int32_t index) { do_sum_f(pCtx, index); // keep the result data in output buffer, not in the intermediate buffer - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { SSumInfo *pSum = (SSumInfo *)pCtx->aOutputBuf; pSum->hasResult = DATA_SET_FLAG; } @@ -615,8 +612,7 @@ static int32_t sum_merge_impl(const SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; GET_TRUE_DATA_TYPE(); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + assert(pCtx->stableQuery); for (int32_t i = 0; i < pCtx->size; ++i) { char * input = GET_INPUT_CHAR_INDEX(pCtx, i); @@ -661,7 +657,7 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) { int32_t notNullElems = sum_merge_impl(pCtx); SET_VAL(pCtx, notNullElems, 1); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (notNullElems > 0) { pResInfo->hasResult = DATA_SET_FLAG; @@ -755,9 +751,9 @@ static void avg_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; // NOTE: keep the intermediate result into the interResultBuf - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SAvgInfo *pAvgInfo = (SAvgInfo *)pResInfo->interResultBuf; + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); double * pVal = &pAvgInfo->sum; if (pCtx->preAggVals.isSet) { @@ -800,8 +796,8 @@ static void avg_function(SQLFunctionCtx *pCtx) { } // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SAvgInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); } } @@ -814,9 +810,9 @@ static void avg_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); // NOTE: keep the intermediate result into the interResultBuf - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SAvgInfo *pAvgInfo = (SAvgInfo *)pResInfo->interResultBuf; + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { pAvgInfo->sum += GET_INT8_VAL(pData); @@ -839,16 +835,16 @@ static void avg_function_f(SQLFunctionCtx *pCtx, int32_t index) { pResInfo->hasResult = DATA_SET_FLAG; // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SAvgInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); } } static void avg_func_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pCtx->stableQuery); - SAvgInfo *pAvgInfo = (SAvgInfo *)pResInfo->interResultBuf; + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); char * input = GET_INPUT_CHAR(pCtx); for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { @@ -864,12 +860,12 @@ static void avg_func_merge(SQLFunctionCtx *pCtx) { // if the data set hasResult is not set, the result is null if (pAvgInfo->num > 0) { pResInfo->hasResult = DATA_SET_FLAG; - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SAvgInfo)); + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); } } static void avg_func_second_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); double *sum = (double*) pCtx->aOutputBuf; char * input = GET_INPUT_CHAR(pCtx); @@ -883,7 +879,7 @@ static void avg_func_second_merge(SQLFunctionCtx *pCtx) { *sum += pInput->sum; // keep the number of data into the temp buffer - *(int64_t *)pResInfo->interResultBuf += pInput->num; + *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num; } } @@ -891,21 +887,21 @@ static void avg_func_second_merge(SQLFunctionCtx *pCtx) { * the average value is calculated in finalize routine, since current routine does not know the exact number of points */ static void avg_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); - if (GET_INT64_VAL(pResInfo->interResultBuf) <= 0) { + if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); return; // empty table } - *(double *)pCtx->aOutputBuf = (*(double *)pCtx->aOutputBuf) / *(int64_t *)pResInfo->interResultBuf; + *(double *)pCtx->aOutputBuf = (*(double *)pCtx->aOutputBuf) / *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo); } else { // this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY assert(pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_DOUBLE); - SAvgInfo *pAvgInfo = (SAvgInfo *)pResInfo->interResultBuf; + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); if (pAvgInfo->num == 0) { // all data are NULL or empty table setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); @@ -1116,11 +1112,11 @@ static void min_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; // set the flag for super table query - if (pResInfo->superTableQ) { + if (pCtx->stableQuery) { *(pCtx->aOutputBuf + pCtx->inputBytes) = DATA_SET_FLAG; } } @@ -1133,11 +1129,11 @@ static void max_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; // set the flag for super table query - if (pResInfo->superTableQ) { + if (pCtx->stableQuery) { *(pCtx->aOutputBuf + pCtx->inputBytes) = DATA_SET_FLAG; } } @@ -1148,8 +1144,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp GET_TRUE_DATA_TYPE(); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + assert(pCtx->stableQuery); for (int32_t i = 0; i < pCtx->size; ++i) { char *input = GET_INPUT_CHAR_INDEX(pCtx, i); @@ -1210,7 +1205,7 @@ static void min_func_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); - if (notNullElems > 0) { // for super table query, SResultInfo is not used + if (notNullElems > 0) { // for super table query, SResultRowCellInfo is not used char *flag = pCtx->aOutputBuf + pCtx->inputBytes; *flag = DATA_SET_FLAG; } @@ -1221,7 +1216,7 @@ static void min_func_second_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (notNullElems > 0) { pResInfo->hasResult = DATA_SET_FLAG; } @@ -1242,7 +1237,7 @@ static void max_func_second_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, numOfElem, 1); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (numOfElem > 0) { pResInfo->hasResult = DATA_SET_FLAG; } @@ -1297,8 +1292,8 @@ static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); minMax_function_f(pCtx, index, 0); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { char *flag = pCtx->aOutputBuf + pCtx->inputBytes; *flag = DATA_SET_FLAG; } @@ -1313,8 +1308,8 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); minMax_function_f(pCtx, index, 1); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { char *flag = pCtx->aOutputBuf + pCtx->inputBytes; *flag = DATA_SET_FLAG; } @@ -1330,7 +1325,7 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void stddev_function(SQLFunctionCtx *pCtx) { // the second stage to calculate standard deviation - SStddevInfo *pStd = GET_RES_INFO(pCtx)->interResultBuf; + SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pStd->stage == 0) { // the first stage is to calculate average value avg_function(pCtx); @@ -1381,8 +1376,8 @@ static void stddev_function(SQLFunctionCtx *pCtx) { static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) { // the second stage to calculate standard deviation - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevInfo *pStd = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); /* the first stage is to calculate average value */ if (pStd->stage == 0) { @@ -1433,8 +1428,8 @@ static void stddev_next_step(SQLFunctionCtx *pCtx) { * the stddevInfo and the average info struct share the same buffer area * And the position of each element in their struct is exactly the same matched */ - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevInfo *pStd = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); if (pStd->stage == 0) { /* @@ -1449,7 +1444,7 @@ static void stddev_next_step(SQLFunctionCtx *pCtx) { pResInfo->initialized = true; // set it initialized to avoid re-initialization // save average value into tmpBuf, for second stage scan - SAvgInfo *pAvg = pResInfo->interResultBuf; + SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo); pStd->avg = GET_DOUBLE_VAL(pCtx->aOutputBuf); assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum)); @@ -1459,7 +1454,7 @@ static void stddev_next_step(SQLFunctionCtx *pCtx) { } static void stddev_finalizer(SQLFunctionCtx *pCtx) { - SStddevInfo *pStd = (SStddevInfo *)GET_RES_INFO(pCtx)->interResultBuf; + SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pStd->num <= 0) { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); @@ -1505,7 +1500,7 @@ static void first_function(SQLFunctionCtx *pCtx) { TSKEY k = pCtx->ptsList[i]; DO_UPDATE_TAG_COLUMNS(pCtx, k); - SResultInfo *pInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); pInfo->hasResult = DATA_SET_FLAG; pInfo->complete = true; @@ -1532,7 +1527,7 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) { TSKEY ts = pCtx->ptsList[index]; DO_UPDATE_TAG_COLUMNS(pCtx, ts); - SResultInfo *pInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); pInfo->hasResult = DATA_SET_FLAG; pInfo->complete = true; // get the first not-null data, completed } @@ -1576,7 +1571,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { first_data_assign_impl(pCtx, data, i); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; notNullElems++; @@ -1604,8 +1599,7 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void first_dist_func_merge(SQLFunctionCtx *pCtx) { char *pData = GET_INPUT_CHAR(pCtx); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pCtx->size == 1 && pResInfo->superTableQ); + assert(pCtx->size == 1 && pCtx->stableQuery); SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->inputBytes); if (pInput->hasResult != DATA_SET_FLAG) { @@ -1620,8 +1614,8 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { } static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) { - assert(pCtx->resultInfo->superTableQ); - + assert(pCtx->stableQuery); + char * pData = GET_INPUT_CHAR(pCtx); SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); if (pInput->hasResult != DATA_SET_FLAG) { @@ -1668,7 +1662,7 @@ static void last_function(SQLFunctionCtx *pCtx) { TSKEY ts = pCtx->ptsList[i]; DO_UPDATE_TAG_COLUMNS(pCtx, ts); - SResultInfo *pInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); pInfo->hasResult = DATA_SET_FLAG; pInfo->complete = true; // set query completed on this column @@ -1691,7 +1685,7 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { TSKEY ts = pCtx->ptsList[index]; DO_UPDATE_TAG_COLUMNS(pCtx, ts); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; pResInfo->complete = true; // set query completed } @@ -1740,7 +1734,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { last_data_assign_impl(pCtx, data, i); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; notNullElems++; @@ -1776,8 +1770,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void last_dist_func_merge(SQLFunctionCtx *pCtx) { char *pData = GET_INPUT_CHAR(pCtx); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pCtx->size == 1 && pResInfo->superTableQ); + assert(pCtx->size == 1 && pCtx->stableQuery); // the input data is null SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->inputBytes); @@ -1833,11 +1826,11 @@ static void last_row_function(SQLFunctionCtx *pCtx) { // assign the last element in current data block assignVal(pCtx->aOutputBuf, pData + (pCtx->size - 1) * pCtx->inputBytes, pCtx->inputBytes, pCtx->inputType); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; // set the result to final result buffer in case of super table query - if (pResInfo->superTableQ) { + if (pCtx->stableQuery) { SLastrowInfo *pInfo1 = (SLastrowInfo *)(pCtx->aOutputBuf + pCtx->inputBytes); pInfo1->ts = pCtx->ptsList[pCtx->size - 1]; pInfo1->hasResult = DATA_SET_FLAG; @@ -1852,7 +1845,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) { static void last_row_finalizer(SQLFunctionCtx *pCtx) { // do nothing at the first stage - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pCtx->aOutputBuf, pCtx->outputType); @@ -2044,8 +2037,8 @@ static int32_t resDataAscComparFn(const void *pLeft, const void *pRight) { static int32_t resDataDescComparFn(const void *pLeft, const void *pRight) { return -resDataAscComparFn(pLeft, pRight); } static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STopBotInfo *pRes = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STopBotInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); tValuePair **tvp = pRes->res; @@ -2135,18 +2128,18 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { * top/bottom use the intermediate result buffer to keep the intermediate result */ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); // only the first_stage_merge is directly written data into final output buffer - if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) { + if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) { return (STopBotInfo*) pCtx->aOutputBuf; } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer - return pResInfo->interResultBuf; + return GET_ROWCELL_INTERBUF(pResInfo); } } bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo == NULL) { return true; } @@ -2252,7 +2245,7 @@ static void top_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2270,7 +2263,7 @@ static void top_function_f(SQLFunctionCtx *pCtx, int32_t index) { do_top_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, pCtx->ptsList[index], pCtx->inputType, &pCtx->tagInfo, NULL, 0); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } @@ -2285,8 +2278,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { // remmap the input buffer may cause the struct pointer invalid, so rebuild the STopBotInfo is necessary buildTopBotStruct(pInput, pCtx); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1); + assert(pCtx->stableQuery && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1); STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); @@ -2314,7 +2306,7 @@ static void top_func_second_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pInput->num, pOutput->num); if (pOutput->num > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2343,7 +2335,7 @@ static void bottom_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2359,7 +2351,7 @@ static void bottom_function_f(SQLFunctionCtx *pCtx, int32_t index) { do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64Key, pData, pCtx->ptsList[index], pCtx->inputType, &pCtx->tagInfo, NULL, 0); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } @@ -2374,8 +2366,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { // remmap the input buffer may cause the struct pointer invalid, so rebuild the STopBotInfo is necessary buildTopBotStruct(pInput, pCtx); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1); + assert(pCtx->stableQuery && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1); STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); @@ -2403,16 +2394,16 @@ static void bottom_func_second_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pInput->num, pOutput->num); if (pOutput->num > 0) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; } } static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); // data in temporary list is less than the required number of results, not enough qualified number of results - STopBotInfo *pRes = pResInfo->interResultBuf; + STopBotInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); if (pRes->num == 0) { // no result assert(pResInfo->hasResult != DATA_SET_FLAG); // TODO: @@ -2443,8 +2434,8 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { } // in the first round, get the min-max value of all involved data - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX); SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX); pInfo->numOfElems = 0; @@ -2455,8 +2446,8 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { static void percentile_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // the first stage, only acquire the min/max value if (pInfo->stage == 0) { @@ -2546,9 +2537,9 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = (SPercentileInfo *)pResInfo->interResultBuf; + SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->stage == 0) { // TODO extract functions @@ -2595,8 +2586,8 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void percentile_finalizer(SQLFunctionCtx *pCtx) { double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64Key : pCtx->param[0].dKey; - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + tMemBucket * pMemBucket = ((SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo))->pMemBucket; if (pMemBucket->total > 0) { // check for null *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); @@ -2609,8 +2600,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { } static void percentile_next_step(SQLFunctionCtx *pCtx) { - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->stage == 0) { // all data are null, set it completed @@ -2627,12 +2618,12 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { ////////////////////////////////////////////////////////////////////////////////// static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) { + if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) { return (SAPercentileInfo*) pCtx->aOutputBuf; } else { - return pResInfo->interResultBuf; + return GET_ROWCELL_INTERBUF(pResInfo); } } @@ -2651,7 +2642,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { static void apercentile_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultInfo * pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pInfo = getAPerctInfo(pCtx); for (int32_t i = 0; i < pCtx->size; ++i) { @@ -2704,8 +2695,8 @@ static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo *pInfo = getAPerctInfo(pCtx); // pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SAPercentileInfo *pInfo = getAPerctInfo(pCtx); double v = 0; switch (pCtx->inputType) { @@ -2736,8 +2727,8 @@ static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void apercentile_func_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pCtx->stableQuery); SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_CHAR(pCtx); @@ -2794,7 +2785,7 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) { pOutput->pHisto = pRes; } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; SET_VAL(pCtx, 1, 1); } @@ -2802,8 +2793,8 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) { static void apercentile_finalizer(SQLFunctionCtx *pCtx) { double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64Key : pCtx->param[0].dKey; - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo *pOutput = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null @@ -2840,8 +2831,8 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) { return false; } - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SLeastsquareInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // 2*3 matrix pInfo->startVal = pCtx->param[0].dKey; @@ -2867,8 +2858,8 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) { } static void leastsquares_function(SQLFunctionCtx *pCtx) { - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SLeastsquareInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); double(*param)[3] = pInfo->mat; double x = pInfo->startVal; @@ -2938,8 +2929,8 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SLeastsquareInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); double(*param)[3] = pInfo->mat; @@ -2988,8 +2979,8 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { // no data in query - SResultInfo * pResInfo = GET_RES_INFO(pCtx); - SLeastsquareInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->num == 0) { if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { @@ -3054,7 +3045,7 @@ static void col_project_function(SQLFunctionCtx *pCtx) { } static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pCtx->numOfParams == 2) { // the number of output rows should not affect the final number of rows, so set it to be 0 return; } @@ -3486,7 +3477,7 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx) { return false; } - SSpreadInfo *pInfo = GET_RES_INFO(pCtx)->interResultBuf; + SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); // this is the server-side setup function in client-side, the secondary merge do not need this procedure if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { @@ -3501,8 +3492,8 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx) { } static void spread_function(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SSpreadInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t numOfElems = 0; @@ -3568,8 +3559,8 @@ static void spread_function(SQLFunctionCtx *pCtx) { } // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SSpreadInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SSpreadInfo)); } } @@ -3581,8 +3572,8 @@ static void spread_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SSpreadInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); double val = 0.0; if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { @@ -3611,16 +3602,16 @@ static void spread_function_f(SQLFunctionCtx *pCtx, int32_t index) { pResInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG; - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SSpreadInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SSpreadInfo)); } } void spread_func_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pCtx->stableQuery); - SSpreadInfo *pResData = pResInfo->interResultBuf; + SSpreadInfo *pResData = GET_ROWCELL_INTERBUF(pResInfo); int32_t notNullElems = 0; for (int32_t i = 0; i < pCtx->size; ++i) { @@ -3644,7 +3635,7 @@ void spread_func_merge(SQLFunctionCtx *pCtx) { } if (notNullElems > 0) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SSpreadInfo)); + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SSpreadInfo)); pResInfo->hasResult = DATA_SET_FLAG; } } @@ -3675,7 +3666,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { * here we do not check the input data types, because in case of metric query, * the type of intermediate data is binary */ - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); @@ -3690,7 +3681,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { assert((pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_DOUBLE) || (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP)); - SSpreadInfo *pInfo = GET_RES_INFO(pCtx)->interResultBuf; + SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); return; @@ -3714,8 +3705,8 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { return false; } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; - STwaInfo * pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; + STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo->lastKey = INT64_MIN; pInfo->type = pCtx->inputType; @@ -3754,8 +3745,8 @@ static void twa_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STwaInfo * pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t i = 0; @@ -3808,7 +3799,7 @@ static void twa_function(SQLFunctionCtx *pCtx) { pResInfo->hasResult = DATA_SET_FLAG; } - if (pResInfo->superTableQ) { + if (pCtx->stableQuery) { memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); } @@ -3825,8 +3816,8 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { TSKEY *primaryKey = pCtx->ptsList; - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STwaInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->lastKey == INT64_MIN) { pInfo->lastKey = pCtx->nStartQueryTimestamp; @@ -3848,14 +3839,13 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { // pCtx->numOfIteratedElems += 1; pResInfo->hasResult = DATA_SET_FLAG; - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(STwaInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); } } static void twa_func_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + assert(pCtx->stableQuery); STwaInfo *pBuf = (STwaInfo *)pCtx->aOutputBuf; char * indicator = pCtx->aInputElemBuf; @@ -3895,16 +3885,16 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) { */ void twa_function_copy(SQLFunctionCtx *pCtx) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); + memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); pResInfo->hasResult = ((STwaInfo *)pCtx->aInputElemBuf)->hasResult; } void twa_function_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - STwaInfo *pInfo = (STwaInfo *)pResInfo->interResultBuf; + STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); assert(pInfo->EKey >= pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); if (pInfo->hasResult != DATA_SET_FLAG) { @@ -3932,8 +3922,8 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { */ static void interp_function(SQLFunctionCtx *pCtx) { // at this point, the value is existed, return directly - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SInterpInfoDetail* pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SInterpInfoDetail* pInfo = GET_ROWCELL_INTERBUF(pResInfo); if (pCtx->size == 1) { char *pData = GET_INPUT_CHAR(pCtx); @@ -4019,8 +4009,8 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) { return false; // not initialized since it has been initialized } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STSCompInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo->pTSBuf = tsBufCreate(false, pCtx->order); pInfo->pTSBuf->tsOrder = pCtx->order; @@ -4028,8 +4018,8 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) { } static void ts_comp_function(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STSBuf * pTSbuf = ((STSCompInfo *)(pResInfo->interResultBuf))->pTSBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STSBuf * pTSbuf = ((STSCompInfo *)(GET_ROWCELL_INTERBUF(pResInfo)))->pTSBuf; const char *input = GET_INPUT_CHAR(pCtx); @@ -4053,8 +4043,8 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - STSCompInfo *pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STSBuf *pTSbuf = pInfo->pTSBuf; @@ -4065,9 +4055,9 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void ts_comp_finalize(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - STSCompInfo *pInfo = pResInfo->interResultBuf; + STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STSBuf * pTSbuf = pInfo->pTSBuf; tsBufFlush(pTSbuf); @@ -4116,8 +4106,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) { return false; } - SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; - SRateInfo * pInfo = pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; + SRateInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo->CorrectionValue = 0; pInfo->firstKey = INT64_MIN; @@ -4136,8 +4126,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) { static void rate_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); TSKEY *primaryKey = pCtx->ptsList; tscDebug("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); @@ -4200,8 +4190,8 @@ static void rate_function(SQLFunctionCtx *pCtx) { } // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); } } @@ -4212,8 +4202,8 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { } // NOTE: keep the intermediate result into the interResultBuf - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); TSKEY *primaryKey = pCtx->ptsList; int64_t v = 0; @@ -4257,20 +4247,18 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { pResInfo->hasResult = DATA_SET_FLAG; // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); } } static void rate_func_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + assert(pCtx->stableQuery); tscDebug("rate_func_merge() size:%d", pCtx->size); - //SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf; char *indicator = pCtx->aInputElemBuf; @@ -4303,8 +4291,8 @@ static void rate_func_merge(SQLFunctionCtx *pCtx) { static void rate_func_copy(SQLFunctionCtx *pCtx) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult; SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf; @@ -4315,8 +4303,8 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) { static void rate_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); tscDebug("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); @@ -4341,8 +4329,8 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) { static void irate_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); TSKEY *primaryKey = pCtx->ptsList; tscDebug("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); @@ -4404,8 +4392,8 @@ static void irate_function(SQLFunctionCtx *pCtx) { } // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); } } @@ -4416,8 +4404,8 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { } // NOTE: keep the intermediate result into the interResultBuf - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); TSKEY *primaryKey = pCtx->ptsList; int64_t v = 0; @@ -4453,16 +4441,16 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { pResInfo->hasResult = DATA_SET_FLAG; // keep the data into the final output buffer for super table query since this execution may be the last one - if (pResInfo->superTableQ) { - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + if (pCtx->stableQuery) { + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); } } static void do_sumrate_merge(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - assert(pResInfo->superTableQ); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pCtx->stableQuery); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); char * input = GET_INPUT_CHAR(pCtx); for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { @@ -4486,7 +4474,7 @@ static void do_sumrate_merge(SQLFunctionCtx *pCtx) { if (DATA_SET_FLAG == pRateInfo->hasResult) { pResInfo->hasResult = DATA_SET_FLAG; SET_VAL(pCtx, pRateInfo->num, 1); - memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); } } @@ -4501,10 +4489,10 @@ static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) { } static void sumrate_finalizer(SQLFunctionCtx *pCtx) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - tscDebug("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pResInfo->superTableQ, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult); + tscDebug("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pCtx->stableQuery, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult); if (pRateInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 8c835806a77732e02398b9fdbf07be135c19c0ab..58c2b76b72ff052dd59e1a721c046f713e72d17e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -99,12 +99,12 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc pCtx->param[1].i64Key = pQueryInfo->order.orderColId; } - SResultInfo *pResInfo = &pReducer->pResInfo[i]; - pResInfo->bufLen = pExpr->interBytes; - pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen); +// SResultRowCellInfo *pResInfo = &pReducer->pResInfo[i]; + pCtx->interBufBytes = pExpr->interBytes; +// pResInfo->interResultBuf = calloc(1, (size_t) pCtx->interBufBytes); pCtx->resultInfo = &pReducer->pResInfo[i]; - pCtx->resultInfo->superTableQ = true; + pCtx->stableQuery = true; } int16_t n = 0; @@ -345,7 +345,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); pReducer->pTempBuffer->num = 0; - pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo)); + pReducer->pResInfo = calloc(numOfCols, sizeof(SResultRowCellInfo)); tscCreateResPointerInfo(pRes, pQueryInfo); tscInitSqlContext(pCmd, pReducer, pDesc); @@ -512,7 +512,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { if (pLocalReducer->pResInfo != NULL) { size_t num = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < num; ++i) { - taosTFree(pLocalReducer->pResInfo[i].interResultBuf); +// taosTFree(pLocalReducer->pResInfo[i].interResultBuf); } taosTFree(pLocalReducer->pResInfo); @@ -1072,7 +1072,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) continue; } - SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]); + SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (maxOutput < pResInfo->numOfRes) { maxOutput = pResInfo->numOfRes; } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ab20840cbeac2f6b3f906869aff2f33ae653d755..64a0cb1a62576bd3971408c45587fb93da2629dc 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -61,14 +61,14 @@ typedef struct SSqlGroupbyExpr { int16_t orderType; // order by type: asc/desc } SSqlGroupbyExpr; -typedef struct SWindowResult { +typedef struct SResultRow { int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t rowId:15; bool closed:1; // this result status: closed or opened uint16_t numOfRows; // number of rows of current time window - SResultInfo* resultInfo; // For each result column, there is a resultInfo + SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo union {STimeWindow win; char* key;}; // start key of current time window -} SWindowResult; +} SResultRow; /** * If the number of generated results is greater than this value, @@ -82,7 +82,7 @@ typedef struct SResultRec { } SResultRec; typedef struct SWindowResInfo { - SWindowResult** pResult; // result list + SResultRow** pResult; // result list int16_t type; // data type for hash key int32_t capacity; // max capacity int32_t curIndex; // current start active index @@ -169,11 +169,11 @@ typedef struct SQuery { typedef struct SQueryRuntimeEnv { jmp_buf env; - SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo + SResultRowCellInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; SQLFunctionCtx* pCtx; int32_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; + uint16_t offset[TSDB_MAX_COLUMNS]; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; SWindowResInfo windowResInfo; @@ -192,6 +192,8 @@ typedef struct SQueryRuntimeEnv { SHashObj* pWindowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer SWindowResultPool* pool; // window result object pool + + int32_t* rowCellInfoOffset;// offset value for each row result cell info } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 377578042394590c75d9bba67dfc5de8559777d8..698608a6896a39b21c1524cc3a75bcee69767d12 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -26,8 +26,9 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery); -void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes); -void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src); +void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow); +void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src); +SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size, int32_t threshold, int16_t type); @@ -42,7 +43,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); -static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { +static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]; } @@ -52,9 +53,9 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); -int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize); +int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow); -static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult, +static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, tFilePage* page) { assert(pResult != NULL && pRuntimeEnv != NULL); @@ -74,7 +75,7 @@ __filter_func_t *getValueFilterFuncArray(int32_t type); size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); SWindowResultPool* initWindowResultPool(size_t size); -SWindowResult* getNewWindowResult(SWindowResultPool* p); +SResultRow* getNewWindowResult(SWindowResultPool* p); int64_t getWindowResultPoolMemSize(SWindowResultPool* p); void* destroyWindowResultPool(SWindowResultPool* p); int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p); diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 28b9a60102d41a11fa02fee1ca51cbf8e263bf3b..3bd4aad2766954709231f55ecdccfe03209a91bc 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -145,15 +145,14 @@ typedef struct SInterpInfoDetail { int8_t primaryCol; } SInterpInfoDetail; -typedef struct SResultInfo { +typedef struct SResultRowCellInfo { int8_t hasResult; // result generated, not NULL value - bool initialized; // output buffer has been initialized - bool complete; // query has completed - bool superTableQ; // is super table query - uint32_t bufLen; // buffer size - uint64_t numOfRes; // num of output result in current buffer - void* interResultBuf; // output result buffer -} SResultInfo; + bool initialized; // output buffer has been initialized + bool complete; // query has completed + uint16_t numOfRes; // num of output result in current buffer +} SResultRowCellInfo; + +#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo))) struct SQLFunctionCtx; @@ -175,9 +174,11 @@ typedef struct SQLFunctionCtx { int16_t inputBytes; int16_t outputType; - int16_t outputBytes; // size of results, determined by function and input column data type - bool hasNull; // null value exist in current block + int16_t outputBytes; // size of results, determined by function and input column data type + int32_t interBufBytes; // internal buffer size + bool hasNull; // null value exist in current block bool requireNull; // require null in some function + bool stableQuery; int16_t functionId; // function id void * aInputElemBuf; char * aOutputBuf; // final result output buffer, point to sdata->data @@ -189,7 +190,8 @@ typedef struct SQLFunctionCtx { void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SQLPreAggVal preAggVals; tVariant tag; - SResultInfo *resultInfo; + + SResultRowCellInfo *resultInfo; SExtTagsInfo tagInfo; } SQLFunctionCtx; @@ -274,16 +276,16 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha (_r)->initialized = false; \ } while (0) -void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf); +//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf); -static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) { +static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) { pResInfo->initialized = true; // the this struct has been initialized flag pResInfo->complete = false; pResInfo->hasResult = false; pResInfo->numOfRes = 0; - memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen); + memset(GET_ROWCELL_INTERBUF(pResInfo), 0, (size_t)bufLen); } #ifdef __cplusplus diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d5e1962eeaa94a13e202222d9c825d03929169fb..a573431bfcc5b6cc642c45e5b460b44a15927bc7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -178,9 +178,9 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { // todo move to utility static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); -static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); -static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); -static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); +static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); +static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); +static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, @@ -255,7 +255,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { continue; } - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) { maxOutput = pResInfo->numOfRes; } @@ -272,7 +272,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); int16_t functionId = pRuntimeEnv->pCtx[j].functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || @@ -447,7 +447,7 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis return true; } -static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, +static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -470,31 +470,27 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin } char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); - // pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SWindowResult); + // pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SResultRow); // pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity); if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - pWindowResInfo->pResult = (SWindowResult **)t; + pWindowResInfo->pResult = (SResultRow **)t; int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity; memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc); pWindowResInfo->capacity = (int32_t)newCapacity; } -// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc; - SWindowResult* pResult = getNewWindowResult(pRuntimeEnv->pool); +// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultRowCellInfo) + pRuntimeEnv->interBufSize) * inc; + SResultRow* pResult = getNewWindowResult(pRuntimeEnv->pool); pWindowResInfo->pResult[pWindowResInfo->size] = pResult; -// for (int32_t i = pWindowResInfo->capacity; i < newCapacity; ++i) { - int32_t ret = createQueryResultInfo(pQuery, pResult, pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } -// } - -// } + int32_t ret = createQueryResultInfo(pQuery, pResult); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } // add a new result set for a new group pWindowResInfo->curIndex = pWindowResInfo->size++; @@ -522,7 +518,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); - SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot); + SResultRow* pWindowRes = getWindowResult(pWindowResInfo, slot); w = pWindowRes->win; } @@ -558,7 +554,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t return w; } -static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, +static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, int32_t numOfRowsPerPage) { if (pWindowRes->pageId != -1) { return 0; @@ -608,7 +604,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, pBockInfo->uid); + SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, pBockInfo->uid); if (pWindowRes == NULL) { *newWind = false; @@ -693,7 +689,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe int64_t skey = TSKEY_INITIAL_VAL; for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = pWindowResInfo->pResult[i]; + SResultRow *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { numOfClosed += 1; continue; @@ -1118,7 +1114,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } uint64_t uid = 0; // uid is always set to be 0. - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); + SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); if (pWindowRes == NULL) { return -1; } @@ -1232,7 +1228,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { } static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SQuery* pQuery = pRuntimeEnv->pQuery; // in case of timestamp column, always generated results. @@ -1519,7 +1515,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY * top/bottom values emerge, so does diff function */ if (functionId == TSDB_FUNC_TWA) { - STwaInfo *pTWAInfo = GET_RES_INFO(pCtx)->interResultBuf; + SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx); + STwaInfo *pTWAInfo = (STwaInfo*) GET_ROWCELL_INTERBUF(pInfo); pTWAInfo->SKey = pQuery->window.skey; pTWAInfo->EKey = pQuery->window.ekey; } @@ -1533,7 +1530,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; } } else if (functionId == TSDB_FUNC_INTERP) { - SInterpInfoDetail *pInterpInfo = GET_RES_INFO(pCtx)->interResultBuf; + SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx); + + SInterpInfoDetail *pInterpInfo = (SInterpInfoDetail *)GET_ROWCELL_INTERBUF(pInfo); pInterpInfo->type = (int8_t)pQuery->fillType; pInterpInfo->ts = pQuery->window.skey; pInterpInfo->primaryCol = (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); @@ -1610,26 +1609,29 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery, char* buf) { - char* p = buf; - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t size = pQuery->pSelectExpr[i].interBytes; - setResultInfoBuf(&pResultInfo[i], size, isStableQuery, p); - - p += size; - } +static FORCE_INLINE void setResultRowCellInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pRow, char* buf) { +// SQuery* pQuery = pRuntimeEnv->pQuery; +// +// char* p = buf; +// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { +// int32_t size = pQuery->pSelectExpr[i].interBytes; +// SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, i); +// setResultInfoBuf(pInfo, p); +// p += size; +// } } static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) { qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; - size_t size = pRuntimeEnv->interBufSize + pQuery->numOfOutput * sizeof(SResultInfo); + size_t size = pRuntimeEnv->interBufSize + pQuery->numOfOutput * sizeof(SResultRowCellInfo); pRuntimeEnv->resultInfo = calloc(1, size); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); + pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); - if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) { + if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { goto _clean; } @@ -1675,6 +1677,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->order = pQuery->order.order; pCtx->functionId = pSqlFuncMsg->functionId; + pCtx->stableQuery = pRuntimeEnv->stableQuery; + pCtx->interBufBytes = pQuery->pSelectExpr[i].interBytes; pCtx->numOfParams = pSqlFuncMsg->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -1704,13 +1708,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order if (i > 0) { pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes; + pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pSelectExpr[i - 1].interBytes; } + } - char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultInfo) * pQuery->numOfOutput; +// char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultRowCellInfo) * pQuery->numOfOutput; // set the intermediate result output buffer - setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf); +// setResultRowCellInfo(pRuntimeEnv, pRuntimeEnv->resultInfo, NULL); // if it is group by normal column, do not set output buffer, the output buffer is pResult if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery) { @@ -2655,7 +2661,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { } } -static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult *pWindowRes, bool mergeFlag) { +static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -2829,14 +2835,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) } SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; - SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); + SResultRow * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); TSKEY leftTimestamp = GET_INT64_VAL(b1); SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; - SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); + SResultRow * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); @@ -2957,7 +2963,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { pQuery->rec.rows += offset; } -int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { +int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].base.functionId; @@ -2969,7 +2975,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { continue; } - SResultInfo *pResultInfo = &pWindowRes->resultInfo[j]; + SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[j]; assert(pResultInfo != NULL); if (pResultInfo->numOfRes > 0) { @@ -3038,18 +3044,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SLoserTreeInfo *pTree = NULL; tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); - if (pResultInfo == NULL) { + SResultRow* pRow = calloc(1, getWindowResultSize(pRuntimeEnv)); + if (pRow == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - char* buf = calloc(1, pRuntimeEnv->interBufSize); - if (buf == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } + pRow->pCellInfo = (SResultRowCellInfo*) ((char*) pRow + sizeof(SResultRow)); +// char* buf = (char*) pRow + sizeof(SResultRowCellInfo)*; +// if (buf == NULL) { +// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); +// } - setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf); - resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); + setResultRowCellInfo(pRuntimeEnv, pRow, NULL); + resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex); @@ -3064,8 +3071,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { taosTFree(pTableList); taosTFree(posList); taosTFree(pTree); - taosTFree(pResultInfo); - taosTFree(buf); +// taosTFree(pResultInfo); +// taosTFree(buf); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -3073,7 +3080,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { int32_t pos = pTree->pNode[0].index; SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; - SWindowResult *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + SResultRow *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); @@ -3101,7 +3108,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { return -1; } - resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); + resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); } doMerge(pRuntimeEnv, ts, pWindowRes, false); @@ -3123,7 +3130,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { } } else { // current page is not needed anymore - SWindowResult *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + SResultRow *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); if (pNextWindowRes->pageId != currentPageId) { releaseResBufPage(pRuntimeEnv->pResultBuf, page); } @@ -3140,7 +3147,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { taosTFree(pTree); taosTFree(pTableList); taosTFree(posList); - taosTFree(pResultInfo); +// taosTFree(pResultInfo); return -1; } @@ -3158,8 +3165,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { taosTFree(posList); taosTFree(pTree); - taosTFree(pResultInfo); - taosTFree(buf); +// taosTFree(pResultInfo); +// taosTFree(buf); return pQInfo->groupResInfo.numOfDataPages; } @@ -3202,12 +3209,14 @@ int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR return TSDB_CODE_SUCCESS; } -void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) { +void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow) { + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes; pCtx[k].size = 1; pCtx[k].startOffset = 0; - pCtx[k].resultInfo = &pResultInfo[k]; + pCtx[k].resultInfo = getResultCell(pRuntimeEnv, pRow, k); pQuery->sdata[k]->num = 0; } @@ -3253,7 +3262,7 @@ static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindow continue; } - SWindowResult *buf = getWindowResult(pWindowResInfo, i); + SResultRow *buf = getWindowResult(pWindowResInfo, i); // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { @@ -3261,9 +3270,9 @@ static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindow if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { - buf->resultInfo[j].complete = false; + buf->pCellInfo[j].complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - buf->resultInfo[j].complete = true; + buf->pCellInfo[j].complete = true; } } } @@ -3327,24 +3336,17 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { } } -int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize) { - int32_t numOfCols = pQuery->numOfOutput; - -// size_t size = numOfCols * sizeof(SResultInfo) + interBufSize; - pResultRow->resultInfo = (SResultInfo*)((char*)pResultRow + sizeof(SWindowResult)); - -// pResultRow->resultInfo = calloc(1, size); -// if (pResultRow->resultInfo == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } +int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow) { +// int32_t numOfCols = pQuery->numOfOutput; + pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); pResultRow->pageId = -1; pResultRow->rowId = -1; - char* buf = (char*) pResultRow->resultInfo + numOfCols * sizeof(SResultInfo); +// char* buf = (char*) pResultRow->pCellInfo + numOfCols * sizeof(SResultRowCellInfo); // set the intermediate result output buffer - setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery, buf); +// setResultRowCellInfo(pRunimeEnv, pResultRow, buf); return TSDB_CODE_SUCCESS; } @@ -3409,7 +3411,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { int32_t functionId = pQuery->pSelectExpr[j].base.functionId; pRuntimeEnv->pCtx[j].currentStage = 0; - SResultInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); if (pResInfo->initialized) { continue; } @@ -3478,7 +3480,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = getWindowResult(pWindowResInfo, i); + SResultRow *pResult = getWindowResult(pWindowResInfo, i); if (!pResult->closed) { continue; } @@ -3492,7 +3494,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { } aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]); - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); toContinue |= (!pResInfo->complete); } @@ -3505,7 +3507,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { } aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]); - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); toContinue |= (!pResInfo->complete); } @@ -3704,7 +3706,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *buf = pWindowResInfo->pResult[i]; + SResultRow *buf = pWindowResInfo->pResult[i]; if (!isWindowResClosed(pWindowResInfo, i)) { continue; } @@ -3797,7 +3799,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { } uint64_t uid = 0; // uid is always set to be 0 - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, + SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); if (pWindowRes == NULL) { return; @@ -3820,7 +3822,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { initCtxOutputBuf(pRuntimeEnv); } -void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { +void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group @@ -3839,15 +3841,11 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult * set the output buffer information and intermediate buffer * not all queries require the interResultBuf, such as COUNT */ - pCtx->resultInfo = &pResult->resultInfo[i]; - - // set super table query flag - SResultInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->superTableQ = pRuntimeEnv->stableQuery; + pCtx->resultInfo = &pResult->pCellInfo[i]; } } -void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { +void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group @@ -3856,7 +3854,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->resultInfo = &pResult->resultInfo[i]; + pCtx->resultInfo = &pResult->pCellInfo[i]; if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) { continue; } @@ -3869,12 +3867,6 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - /* - * set the output buffer information and intermediate buffer - * not all queries require the interResultBuf, such as COUNT - */ - pCtx->resultInfo->superTableQ = pRuntimeEnv->stableQuery; // set super table query flag - if (!pCtx->resultInfo->initialized) { aAggs[functionId].init(pCtx); } @@ -4015,7 +4007,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); int32_t totalSet = numOfClosedTimeWindow(pResultInfo); - SWindowResult** result = pResultInfo->pResult; + SResultRow** result = pResultInfo->pResult; if (orderType == TSDB_ORDER_ASC) { startIdx = pQInfo->groupIndex; @@ -4102,7 +4094,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { } for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { - SWindowResult *pResult = pRuntimeEnv->windowResInfo.pResult[i]; + SResultRow *pResult = pRuntimeEnv->windowResInfo.pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pRuntimeEnv->pCtx[j].functionId; @@ -4110,7 +4102,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { continue; } - pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes)); + pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); } } } @@ -4678,7 +4670,7 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); if (pResInfo != NULL) { pResInfo->complete = false; } @@ -5033,9 +5025,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) { pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns - SWindowResult *pResult = pWindowResInfo->pResult[i]; + SResultRow *pResult = pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes)); + pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); } } @@ -6297,7 +6289,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou calResultBufSize(pQuery); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - assert(pExprs[col].interBytes >= pExprs[col].bytes); + assert(pExprs[col].interBytes >= pExprs[col].bytes || pExprs[col].interBytes == 0); // allocate additional memory for interResults that are usually larger then final results size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage)); diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index bd7c4694d0b24deb75ea80ee062df4544b46888f..edb2ca687f476faf01f1a464f80957bd3916407f 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -407,9 +407,9 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { } if (pResultBuf->file != NULL) { - qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%dbytes, file size:%"PRId64" bytes", - pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize, - pResultBuf->fileSize); + qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f", + pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, + pResultBuf->fileSize/1024.0); fclose(pResultBuf->file); } else { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 7ec39cc0c895871400e5d5694dace8c0343c0235..02a81936f31a83a1adbd18e3828d6d54e938309b 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -27,7 +27,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { size += pQuery->pSelectExpr[i].interBytes; } - assert(size > 0); + assert(size >= 0); return size; } @@ -41,26 +41,12 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->size = 0; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; -// SQueryCostInfo* pSummary = &pRuntimeEnv->summary; - pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); if (pWindowResInfo->pResult == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval; - -// pSummary->winInfoSize += POINTER_BYTES * pWindowResInfo->capacity; -// pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity; -// pSummary->numOfTimeWindows = pWindowResInfo->capacity; - -// for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { -// int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } -// } - return TSDB_CODE_SUCCESS; } @@ -82,8 +68,8 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pWindowRes = pWindowResInfo->pResult[i]; - clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); + SResultRow *pWindowRes = pWindowResInfo->pResult[i]; + clearResultRow(pRuntimeEnv, pWindowRes); } pWindowResInfo->curIndex = -1; @@ -108,7 +94,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { int16_t bytes = -1; for (int32_t i = 0; i < num; ++i) { - SWindowResult *pResult = pWindowResInfo->pResult[i]; + SResultRow *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { // remove the window slot from hash table // todo refactor @@ -131,19 +117,19 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { // clear all the closed windows from the window list for (int32_t k = 0; k < remain; ++k) { - copyTimeWindowResBuf(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]); + copyResultRow(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]); } // move the unclosed window in the front of the window list for (int32_t k = remain; k < pWindowResInfo->size; ++k) { - SWindowResult *pWindowRes = pWindowResInfo->pResult[k]; - clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); + SResultRow *pWindowRes = pWindowResInfo->pResult[k]; + clearResultRow(pRuntimeEnv, pWindowRes); } pWindowResInfo->size = remain; for (int32_t k = 0; k < pWindowResInfo->size; ++k) { - SWindowResult *pResult = pWindowResInfo->pResult[k]; + SResultRow *pResult = pWindowResInfo->pResult[k]; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { key = varDataVal(pResult->key); @@ -240,7 +226,7 @@ void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { getWindowResult(pWindowResInfo, slot)->closed = true; } -void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { +void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { if (pWindowRes == NULL) { return; } @@ -248,7 +234,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { - SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; + SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; @@ -269,7 +255,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow * since the attribute of "Pos" is bound to each window result when the window result is created in the * disk-based result buffer. */ -void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { +void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResultRow *src) { dst->numOfRows = src->numOfRows; dst->win = src->win; dst->closed = src->closed; @@ -277,30 +263,35 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput; for (int32_t i = 0; i < nOutputCols; ++i) { - SResultInfo *pDst = &dst->resultInfo[i]; - SResultInfo *pSrc = &src->resultInfo[i]; + SResultRowCellInfo *pDst = getResultCell(pRuntimeEnv, dst, i); + SResultRowCellInfo *pSrc = getResultCell(pRuntimeEnv, src, i); - char *buf = pDst->interResultBuf; - memcpy(pDst, pSrc, sizeof(SResultInfo)); - pDst->interResultBuf = buf; // restore the allocated buffer +// char *buf = pDst->interResultBuf; + memcpy(pDst, pSrc, sizeof(SResultRowCellInfo) + pRuntimeEnv->pCtx[i].interBufBytes); +// pDst->interResultBuf = buf; // restore the allocated buffer // copy the result info struct - memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); +// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes); // copy the output buffer data from src to dst, the position info keep unchanged tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId); char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); - char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage); + char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage); size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; memcpy(dstBuf, srcBuf, s); } } +SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) { + assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput); + return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); +} + size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { - return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo)) + pRuntimeEnv->interBufSize + sizeof(SWindowResult); + return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); } SWindowResultPool* initWindowResultPool(size_t size) { @@ -320,7 +311,7 @@ SWindowResultPool* initWindowResultPool(size_t size) { return p; } -SWindowResult* getNewWindowResult(SWindowResultPool* p) { +SResultRow* getNewWindowResult(SWindowResultPool* p) { if (p == NULL) { return NULL; } diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index d4e9b1677cb36d528830d43c5e97af0b6ff87276..42bc136584d618e62826c307b698746ca387ff41 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -31,13 +31,16 @@ extern "C" { typedef void (*_hash_free_fn_t)(void *param); typedef struct SHashNode { - char *key; +// char *key; struct SHashNode *next; - uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash + uint32_t hashVal; // the hash value of key uint32_t keyLen; // length of the key - char *data; +// char *data; } SHashNode; +#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode)) +#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->keyLen) + typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, HASH_ENTRY_LOCK = 1, diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 8ffbb1f0f61775914c9db8505f591d62aeed9ecc..10767ee30f607b484cb65334c378056b84010f92 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -22,14 +22,13 @@ #define DO_FREE_HASH_NODE(_n) \ do { \ - taosTFree((_n)->data); \ taosTFree(_n); \ } while (0) #define FREE_HASH_NODE(_h, _n) \ do { \ if ((_h)->freeFp) { \ - (_h)->freeFp((_n)->data); \ + (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \ } \ \ DO_FREE_HASH_NODE(_n); \ @@ -77,7 +76,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { SHashNode *pNode = pe->next; while (pNode) { - if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { + if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) { assert(pNode->hashVal == hashVal); break; } @@ -115,11 +114,13 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p * @param dsize size of actual data * @return hash node */ -static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) { +static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { assert(pNode->keyLen == pNewNode->keyLen); - SWAP(pNode->key, pNewNode->key, void *); - SWAP(pNode->data, pNewNode->data, void *); + if (prev != NULL) { + prev->next = pNewNode; + } + pNewNode->next = pNode->next; return pNewNode; } @@ -208,12 +209,14 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da assert(pNode == NULL); } + SHashNode* prev = NULL; while (pNode) { - if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { + if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) { assert(pNode->hashVal == hashVal); break; } + prev = pNode; pNode = pNode->next; } @@ -239,7 +242,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } else { // not support the update operation, return error if (pHashObj->enableUpdate) { - doUpdateHashNode(pNode, pNewNode); + doUpdateHashNode(prev, pNode, pNewNode); } if (pHashObj->type == HASH_ENTRY_LOCK) { @@ -293,13 +296,13 @@ void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); if (pNode != NULL) { if (fp != NULL) { - fp(pNode->data); + fp(GET_HASH_NODE_DATA(pNode)); } if (d != NULL) { - memcpy(d, pNode->data, dsize); + memcpy(d, GET_HASH_NODE_DATA(pNode), dsize); } else { - data = pNode->data; + data = GET_HASH_NODE_DATA(pNode); } } @@ -357,13 +360,13 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe SHashNode *pRes = NULL; // remove it - if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { + if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) { pe->num -= 1; pRes = pNode; pe->next = pNode->next; } else { while (pNode->next != NULL) { - if (((pNode->next)->keyLen == keyLen) && (memcmp((pNode->next)->key, key, keyLen) == 0)) { + if (((pNode->next)->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY((pNode->next)), key, keyLen) == 0)) { assert((pNode->next)->hashVal == hashVal); break; } @@ -392,7 +395,7 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe __rd_unlock(&pHashObj->lock, pHashObj->type); if (data != NULL && pRes != NULL) { - memcpy(data, pRes->data, dsize); + memcpy(data, GET_HASH_NODE_DATA(pRes), dsize); } if (pRes != NULL) { @@ -426,7 +429,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi // todo remove the first node SHashNode *pNode = NULL; while((pNode = pEntry->next) != NULL) { - if (fp && (!fp(param, pNode->data))) { + if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) { pEntry->num -= 1; atomic_sub_fetch_64(&pHashObj->size, 1); @@ -451,7 +454,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi while ((pNext = pNode->next) != NULL) { // not qualified, remove it - if (fp && (!fp(param, pNext->data))) { + if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) { pNode->next = pNext->next; pEntry->num -= 1; atomic_sub_fetch_64(&pHashObj->size, 1); @@ -605,7 +608,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { } } -void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : iter->pCur->data; } +void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : GET_HASH_NODE_DATA(iter->pCur); } void *taosHashDestroyIter(SHashMutableIterator *iter) { if (iter == NULL) { @@ -743,21 +746,19 @@ void taosHashTableResize(SHashObj *pHashObj) { } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { - SHashNode *pNewNode = calloc(1, sizeof(SHashNode)); + SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize); if (pNewNode == NULL) { uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - pNewNode->data = malloc(dsize + keyLen); - memcpy(pNewNode->data, pData, dsize); - - pNewNode->key = pNewNode->data + dsize; - memcpy(pNewNode->key, key, keyLen); - pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; + + memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); + memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); + return pNewNode; }