提交 79b2d952 编写于 作者: H Haojun Liao

[TD-225]refactor.

上级 b9f6714c
......@@ -20,9 +20,6 @@
extern "C" {
#endif
/*
* @date 2018/09/30
*/
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
......
......@@ -89,7 +89,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->startOffset = 0;
pCtx->size = 1;
pCtx->hasNull = true;
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pCtx->currentStage = MERGE_STAGE;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId;
......@@ -1067,7 +1067,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
}
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pCtx->currentStage = MERGE_STAGE;
if (needInit) {
aAggs[pCtx->functionId].init(pCtx);
......@@ -1080,7 +1080,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
continue;
}
aAggs[functionId].distSecondaryMergeFunc(&pLocalReducer->pCtx[j]);
aAggs[functionId].mergeFunc(&pLocalReducer->pCtx[j]);
}
}
......@@ -1647,7 +1647,7 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
arithSup.pArithExpr = pSup->pArithExprInfo;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, (size_t)(pExpr->resBytes * pOutput->num));
......
......@@ -23,7 +23,7 @@ extern "C" {
typedef void (*_bi_consumer_fn_t)(void *left, void *right, int32_t numOfLeft, int32_t numOfRight, void *output,
int32_t order);
_bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t optr);
_bi_consumer_fn_t getArithmeticOperatorFn(int32_t leftType, int32_t rightType, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -74,9 +74,7 @@ typedef struct tExprNode {
};
} tExprNode;
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
tExprNode* exprTreeFromBinary(const void* data, size_t size);
......@@ -87,6 +85,8 @@ void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
#ifdef __cplusplus
}
#endif
......
......@@ -190,7 +190,7 @@ typedef struct SQueryRuntimeEnv {
void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
......
......@@ -112,11 +112,10 @@ extern "C" {
#define TOP_BOTTOM_QUERY_LIMIT 100
enum {
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
FIRST_STAGE_MERGE = 0x10u,
SECONDARY_STAGE_MERGE = 0x20u,
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
};
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
......@@ -215,18 +214,12 @@ typedef struct SQLAggFuncElem {
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version
// some sql function require scan data twice or more, e.g.,stddev
// some sql function require scan data twice or more, e.g.,stddev, percentile
void (*xNextStep)(SQLFunctionCtx *pCtx);
/*
* finalizer must be called after all xFunction has been executed to
* generated final result. Otherwise, the value in aOutputBuf is a intern result.
*/
// finalizer must be called after all xFunction has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*distMergeFunc)(SQLFunctionCtx *pCtx);
void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem;
......
......@@ -34,7 +34,7 @@
#define GET_TRUE_DATA_TYPE() \
int32_t type = 0; \
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { \
if (pCtx->currentStage == MERGE_STAGE) { \
type = pCtx->outputType; \
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \
} else { \
......@@ -607,21 +607,21 @@ static void sum_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
static int32_t sum_merge_impl(const SQLFunctionCtx *pCtx) {
static void sum_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
GET_TRUE_DATA_TYPE();
assert(pCtx->stableQuery);
for (int32_t i = 0; i < pCtx->size; ++i) {
char * input = GET_INPUT_DATA(pCtx, i);
SSumInfo *pInput = (SSumInfo *)input;
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
}
notNullElems++;
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
......@@ -636,25 +636,7 @@ static int32_t sum_merge_impl(const SQLFunctionCtx *pCtx) {
}
}
}
return notNullElems;
}
static void sum_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = sum_merge_impl(pCtx);
SET_VAL(pCtx, notNullElems, 1);
SSumInfo *pSumInfo = (SSumInfo *)pCtx->aOutputBuf;
if (notNullElems > 0) {
// pCtx->numOfIteratedElems += notNullElems;
pSumInfo->hasResult = DATA_SET_FLAG;
}
}
static void sum_func_second_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = sum_merge_impl(pCtx);
SET_VAL(pCtx, notNullElems, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -838,30 +820,6 @@ static void avg_function_f(SQLFunctionCtx *pCtx, int32_t index) {
static void avg_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
assert(pCtx->stableQuery);
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
char * input = GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
SAvgInfo *pInput = (SAvgInfo *)input;
if (pInput->num == 0) { // current buffer is null
continue;
}
pAvgInfo->sum += pInput->sum;
pAvgInfo->num += pInput->num;
}
// if the data set hasResult is not set, the result is null
if (pAvgInfo->num > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo));
}
}
static void avg_func_second_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
double *sum = (double*) pCtx->aOutputBuf;
char * input = GET_INPUT_DATA_LIST(pCtx);
......@@ -885,7 +843,7 @@ static void avg_func_second_merge(SQLFunctionCtx *pCtx) {
static void avg_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
if (pCtx->currentStage == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
......@@ -1208,17 +1166,6 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
}
static void min_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->inputBytes, pCtx->aOutputBuf, 1);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { // for super table query, SResultRowCellInfo is not used
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
*flag = DATA_SET_FLAG;
}
}
static void min_func_second_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->aOutputBuf, 1);
SET_VAL(pCtx, notNullElems, 1);
......@@ -1230,16 +1177,6 @@ static void min_func_second_merge(SQLFunctionCtx *pCtx) {
}
static void max_func_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElems = minmax_merge_impl(pCtx, pCtx->inputBytes, pCtx->aOutputBuf, 0);
SET_VAL(pCtx, numOfElems, 1);
if (numOfElems > 0) {
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
*flag = DATA_SET_FLAG;
}
}
static void max_func_second_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->aOutputBuf, 0);
SET_VAL(pCtx, numOfElem, 1);
......@@ -1604,23 +1541,6 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
assert(pCtx->size == 1 && pCtx->stableQuery);
SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->inputBytes);
if (pInput->hasResult != DATA_SET_FLAG) {
return;
}
SFirstLastInfo *pOutput = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes);
if (pOutput->hasResult != DATA_SET_FLAG || pInput->ts < pOutput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
assert(pCtx->stableQuery);
char * pData = GET_INPUT_DATA_LIST(pCtx);
......@@ -1793,31 +1713,12 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
}
static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
assert(pCtx->size == 1 && pCtx->stableQuery);
// the input data is null
SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->inputBytes);
if (pInput->hasResult != DATA_SET_FLAG) {
return;
}
SFirstLastInfo *pOutput = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes);
if (pOutput->hasResult != DATA_SET_FLAG || pOutput->ts < pInput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
/*
* in the secondary merge(local reduce), the output is limited by the
* final output size, so the main difference between last_dist_func_merge and second_merge
* is: the output data format in computing
*/
static void last_dist_func_second_merge(SQLFunctionCtx *pCtx) {
static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes);
......@@ -1895,7 +1796,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
dst->timestamp = tsKey;
int32_t size = 0;
if (stage == SECONDARY_STAGE_MERGE || stage == FIRST_STAGE_MERGE) {
if (stage == MERGE_STAGE) {
memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen);
} else { // the tags are dumped from the ctx tag fields
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
......@@ -2158,7 +2059,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (STopBotInfo*) pCtx->aOutputBuf;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2299,27 +2200,6 @@ static void top_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
static void top_func_merge(SQLFunctionCtx *pCtx) {
char *input = GET_INPUT_DATA_LIST(pCtx);
STopBotInfo *pInput = (STopBotInfo *)input;
if (pInput->num <= 0) {
return;
}
// remmap the input buffer may cause the struct pointer invalid, so rebuild the STopBotInfo is necessary
buildTopBotStruct(pInput, pCtx);
assert(pCtx->stableQuery && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1);
STopBotInfo *pOutput = getTopBotOutputInfo(pCtx);
for (int32_t i = 0; i < pInput->num; ++i) {
do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp,
pCtx->inputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
}
static void top_func_second_merge(SQLFunctionCtx *pCtx) {
STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx);
// construct the input data struct from binary data
......@@ -2389,27 +2269,6 @@ static void bottom_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
static void bottom_func_merge(SQLFunctionCtx *pCtx) {
char *input = GET_INPUT_DATA_LIST(pCtx);
STopBotInfo *pInput = (STopBotInfo *)input;
if (pInput->num <= 0) {
return;
}
// remmap the input buffer may cause the struct pointer invalid, so rebuild the STopBotInfo is necessary
buildTopBotStruct(pInput, pCtx);
assert(pCtx->stableQuery && pCtx->outputType == TSDB_DATA_TYPE_BINARY && pCtx->size == 1);
STopBotInfo *pOutput = getTopBotOutputInfo(pCtx);
for (int32_t i = 0; i < pInput->num; ++i) {
do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp,
pCtx->inputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
}
static void bottom_func_second_merge(SQLFunctionCtx *pCtx) {
STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx);
// construct the input data struct from binary data
......@@ -2622,7 +2481,7 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2695,41 +2554,6 @@ static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
assert(pCtx->stableQuery);
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo));
pInput->pHisto->elems = (SHistBin*) ((char *)pInput->pHisto + sizeof(SHistogramInfo));
if (pInput->pHisto->numOfElems <= 0) {
return;
}
size_t size = sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); //(SAPercentileInfo *)pCtx->aOutputBuf;
SHistogramInfo * pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, size);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else {
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
}
SET_VAL(pCtx, 1, 1);
pResInfo->hasResult = DATA_SET_FLAG;
}
static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo));
......@@ -2765,7 +2589,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
assert(pOutput->pHisto->numOfElems > 0);
......@@ -3403,7 +3227,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL;
......@@ -3414,7 +3238,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
sas->offset = index;
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
arithmeticTreeTraverse(sas->pArithExpr->pExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes;
}
......@@ -3445,7 +3269,7 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx) {
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
if (pCtx->currentStage == MERGE_STAGE) {
pCtx->param[0].dKey = DBL_MAX;
pCtx->param[3].dKey = -DBL_MAX;
} else {
......@@ -3572,39 +3396,6 @@ static void spread_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
void spread_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
assert(pCtx->stableQuery);
SSpreadInfo *pResData = GET_ROWCELL_INTERBUF(pResInfo);
int32_t notNullElems = 0;
for (int32_t i = 0; i < pCtx->size; ++i) {
SSpreadInfo *input = (SSpreadInfo *)GET_INPUT_DATA(pCtx, i);
/* no assign tag, the value is null */
if (input->hasResult != DATA_SET_FLAG) {
continue;
}
if (pResData->min > input->min) {
pResData->min = input->min;
}
if (pResData->max < input->max) {
pResData->max = input->max;
}
pResData->hasResult = DATA_SET_FLAG;
notNullElems++;
}
if (notNullElems > 0) {
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SSpreadInfo));
pResInfo->hasResult = DATA_SET_FLAG;
}
}
/*
* here we set the result value back to the intermediate buffer, to apply the finalize the function
* the final result is generated in spread_function_finalizer
......@@ -3633,7 +3424,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
*/
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
if (pCtx->currentStage == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (pResInfo->hasResult != DATA_SET_FLAG) {
......@@ -3854,125 +3645,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
int32_t notNullElems = twa_function_impl(pCtx, index, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// TSKEY *primaryKey = GET_TS_LIST(pCtx);
//
//
// STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// int32_t i = pCtx->startOffset;
// int32_t size = pCtx->size;
//
// if (pCtx->start.key != INT64_MIN) {
// assert(pInfo->p.key == INT64_MIN);
//
// pInfo->p.key = primaryKey[index];
// GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index));
//
// pInfo->dOutput += twa_get_area(pCtx->start, pInfo->p);
//
// pInfo->hasResult = DATA_SET_FLAG;
// pInfo->win.skey = pCtx->start.key;
// notNullElems++;
// i += 1;
// } else if (pInfo->p.key == INT64_MIN) {
// pInfo->p.key = primaryKey[index];
// GET_TYPED_DATA(pInfo->p.val, double, pCtx->inputType, GET_INPUT_DATA(pCtx, index));
//
// pInfo->hasResult = DATA_SET_FLAG;
// pInfo->win.skey = pInfo->p.key;
// notNullElems++;
// i += 1;
// }
//
// // calculate the value of
// switch(pCtx->inputType) {
// case TSDB_DATA_TYPE_TINYINT: {
// int8_t *val = (int8_t*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// case TSDB_DATA_TYPE_SMALLINT: {
// int16_t *val = (int16_t*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// case TSDB_DATA_TYPE_INT: {
// int32_t *val = (int32_t*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// case TSDB_DATA_TYPE_BIGINT: {
// int64_t *val = (int64_t*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = (double) val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// case TSDB_DATA_TYPE_FLOAT: {
// float *val = (float*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// case TSDB_DATA_TYPE_DOUBLE: {
// double *val = (double*) GET_INPUT_DATA(pCtx, index);
// for (; i < size; i++) {
// if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
// continue;
// }
//
// SPoint1 st = {.key = i, .val = val[i]};
// pInfo->dOutput += twa_get_area(pInfo->p, st);
// pInfo->p = st;
// }
// break;
// }
// default: assert(0);
// }
//
// // the last interpolated time window value
// if (pCtx->end.key != INT64_MIN) {
// pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);//((pInfo->p.val + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->p.key);
// pInfo->p = pCtx->end;
// }
//
// pInfo->win.ekey = pInfo->p.key;
SET_VAL(pCtx, notNullElems, 1);
......@@ -3985,34 +3657,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
static void twa_func_merge(SQLFunctionCtx *pCtx) {
assert(pCtx->stableQuery);
STwaInfo *pBuf = (STwaInfo *)pCtx->aOutputBuf;
char * indicator = pCtx->aInputElemBuf;
int32_t numOfNotNull = 0;
for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(STwaInfo)) {
STwaInfo *pInput = (STwaInfo*) indicator;
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
}
numOfNotNull++;
pBuf->dOutput += pInput->dOutput;
pBuf->win = pInput->win;
pBuf->p = pInput->p;
}
SET_VAL(pCtx, numOfNotNull, 1);
if (numOfNotNull > 0) {
pBuf->hasResult = DATA_SET_FLAG;
}
}
/*
* To copy the input to interResBuf to avoid the input buffer space be over writen
* by next input data. The TWA function only applies to each table, so no merge procedure
......@@ -4231,7 +3875,6 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
return resultVal;
}
static bool rate_function_setup(SQLFunctionCtx *pCtx) {
if (!function_setup(pCtx)) {
return false;
......@@ -4253,7 +3896,6 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) {
return true;
}
static void rate_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -4353,42 +3995,6 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
static void rate_func_merge(SQLFunctionCtx *pCtx) {
assert(pCtx->stableQuery);
tscDebug("rate_func_merge() size:%d", pCtx->size);
SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf;
char *indicator = pCtx->aInputElemBuf;
assert(1 == pCtx->size);
int32_t numOfNotNull = 0;
for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(SRateInfo)) {
SRateInfo *pInput = (SRateInfo *)indicator;
if (DATA_SET_FLAG != pInput->hasResult) {
continue;
}
numOfNotNull++;
memcpy(pBuf, pInput, sizeof(SRateInfo));
tscDebug("%p rate_func_merge() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64,
pCtx, pInput->isIRate, pInput->firstKey, pInput->lastKey, pInput->firstValue, pInput->lastValue, pInput->CorrectionValue);
}
SET_VAL(pCtx, numOfNotNull, 1);
if (numOfNotNull > 0) {
pBuf->hasResult = DATA_SET_FLAG;
}
return;
}
static void rate_func_copy(SQLFunctionCtx *pCtx) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
......@@ -4397,12 +4003,10 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult;
SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf;
tscDebug("%p rate_func_second_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
tscDebug("%p rate_func_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
}
static void rate_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
......@@ -4426,7 +4030,6 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static void irate_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
......@@ -4554,11 +4157,6 @@ static void sumrate_func_merge(SQLFunctionCtx *pCtx) {
do_sumrate_merge(pCtx);
}
static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) {
tscDebug("%p sumrate_func_second_merge() process ...", pCtx);
do_sumrate_merge(pCtx);
}
static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
......@@ -4622,7 +4220,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
count_func_merge,
count_func_merge,
count_load_data_info,
},
{
......@@ -4637,7 +4234,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
sum_func_merge,
sum_func_second_merge,
statisRequired,
},
{
......@@ -4652,7 +4248,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
avg_finalizer,
avg_func_merge,
avg_func_second_merge,
statisRequired,
},
{
......@@ -4667,7 +4262,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
min_func_merge,
min_func_second_merge,
statisRequired,
},
{
......@@ -4682,7 +4276,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
max_func_merge,
max_func_second_merge,
statisRequired,
},
{
......@@ -4697,7 +4290,6 @@ SQLAggFuncElem aAggs[] = {{
stddev_next_step,
stddev_finalizer,
noop1,
noop1,
dataBlockRequired,
},
{
......@@ -4712,7 +4304,6 @@ SQLAggFuncElem aAggs[] = {{
percentile_next_step,
percentile_finalizer,
noop1,
noop1,
dataBlockRequired,
},
{
......@@ -4727,7 +4318,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
apercentile_finalizer,
apercentile_func_merge,
apercentile_func_second_merge,
dataBlockRequired,
},
{
......@@ -4742,7 +4332,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
noop1,
noop1,
firstFuncRequired,
},
{
......@@ -4757,7 +4346,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
noop1,
noop1,
lastFuncRequired,
},
{
......@@ -4772,8 +4360,7 @@ SQLAggFuncElem aAggs[] = {{
noop2,
no_next_step,
last_row_finalizer,
noop1,
last_dist_func_second_merge,
last_dist_func_merge,
dataBlockRequired,
},
{
......@@ -4789,7 +4376,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
top_bottom_func_finalizer,
top_func_merge,
top_func_second_merge,
dataBlockRequired,
},
{
......@@ -4805,7 +4391,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
top_bottom_func_finalizer,
bottom_func_merge,
bottom_func_second_merge,
dataBlockRequired,
},
{
......@@ -4819,7 +4404,6 @@ SQLAggFuncElem aAggs[] = {{
spread_function_f,
no_next_step,
spread_function_finalizer,
spread_func_merge,
spread_func_sec_merge,
count_load_data_info,
},
......@@ -4834,7 +4418,6 @@ SQLAggFuncElem aAggs[] = {{
twa_function_f,
no_next_step,
twa_function_finalizer,
twa_func_merge,
twa_function_copy,
dataBlockRequired,
},
......@@ -4850,7 +4433,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
leastsquares_finalizer,
noop1,
noop1,
dataBlockRequired,
},
{
......@@ -4865,7 +4447,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
no_data_info,
},
{
......@@ -4880,7 +4461,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
dataBlockRequired,
},
{
......@@ -4895,7 +4475,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
no_data_info,
},
{
......@@ -4910,7 +4489,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
ts_comp_finalize,
copy_function,
copy_function,
dataBlockRequired,
},
{
......@@ -4925,7 +4503,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
no_data_info,
},
{
......@@ -4940,7 +4517,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
dataBlockRequired,
},
{
......@@ -4955,7 +4531,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
no_data_info,
},
{
......@@ -4970,7 +4545,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
copy_function,
dataBlockRequired,
},
{
......@@ -4985,7 +4559,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
doFinalizer,
noop1,
noop1,
dataBlockRequired,
},
// distributed version used in two-stage aggregation processes
......@@ -5001,7 +4574,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
first_dist_func_merge,
first_dist_func_second_merge,
firstDistFuncRequired,
},
{
......@@ -5016,7 +4588,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
function_finalizer,
last_dist_func_merge,
last_dist_func_second_merge,
lastDistFuncRequired,
},
{
......@@ -5030,7 +4601,6 @@ SQLAggFuncElem aAggs[] = {{
do_sum_f, // todo filter handle
no_next_step,
doFinalizer,
noop1,
copy_function,
dataBlockRequired,
},
......@@ -5045,7 +4615,6 @@ SQLAggFuncElem aAggs[] = {{
rate_function_f,
no_next_step,
rate_finalizer,
rate_func_merge,
rate_func_copy,
dataBlockRequired,
},
......@@ -5060,7 +4629,6 @@ SQLAggFuncElem aAggs[] = {{
irate_function_f,
no_next_step,
rate_finalizer,
rate_func_merge,
rate_func_copy,
dataBlockRequired,
},
......@@ -5076,7 +4644,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
dataBlockRequired,
},
{
......@@ -5091,7 +4658,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
dataBlockRequired,
},
{
......@@ -5106,7 +4672,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
dataBlockRequired,
},
{
......@@ -5121,7 +4686,6 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
dataBlockRequired,
},
{
......@@ -5136,6 +4700,5 @@ SQLAggFuncElem aAggs[] = {{
no_next_step,
noop1,
noop1,
noop1,
dataBlockRequired,
}};
......@@ -15,7 +15,7 @@
#include "os.h"
#include "qSyntaxtreefunction.h"
#include "qArithmeticOperator.h"
#include "taosdef.h"
#include "tutil.h"
......@@ -1234,7 +1234,7 @@ _bi_consumer_fn_t rem_function_arraylist[8][10] = {
////////////////////////////////////////////////////////////////////////////////////////////////////////////
_bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t optr) {
_bi_consumer_fn_t getArithmeticOperatorFn(int32_t leftType, int32_t rightType, int32_t optr) {
switch (optr) {
case TSDB_BINARY_OP_ADD:
return add_function_arraylist[leftType][rightType];
......
......@@ -16,29 +16,18 @@
#include "os.h"
#include "exception.h"
#include "qArithmeticOperator.h"
#include "qAst.h"
#include "qSyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tname.h"
#include "tschemautil.h"
#include "tsdb.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "tschemautil.h"
typedef struct {
char* v;
int32_t optr;
} SEndPoint;
typedef struct {
SEndPoint* start;
SEndPoint* end;
} SQueryCond;
static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight) {
if (pLeft->nodeType == TSQL_NODE_COL) {
......@@ -53,323 +42,6 @@ static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, co
}
}
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
free(pNode->pSchema);
}
free(pNode);
}
void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
tExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
}
free(*pExpr);
*pExpr = NULL;
}
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
pCond->start = calloc(1, sizeof(SEndPoint));
pCond->start->optr = queryColInfo->optr;
pCond->start->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
pCond->end = calloc(1, sizeof(SEndPoint));
pCond->end->optr = queryColInfo->optr;
pCond->end->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_IN || optr == TSDB_RELATION_LIKE) {
assert(0);
}
return TSDB_CODE_SUCCESS;
}
static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = NULL;
SQueryCond cond = {0};
if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
//todo handle error
}
if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
}
if (cond.start != NULL) {
int32_t optr = cond.start->optr;
if (optr == TSDB_RELATION_EQUAL) { // equals
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
if (ret != 0) {
break;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
bool comp = true;
int32_t ret = 0;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue;
} else {
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false;
}
}
} else if (optr == TSDB_RELATION_NOT_EQUAL) { // not equal
bool comp = true;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
tSkipListDestroyIter(iter);
comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else {
assert(0);
}
} else {
int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
bool comp = true;
int32_t ret = 0;
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue;
} else {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false; // no need to compare anymore
}
}
} else {
assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
}
}
}
free(cond.start);
free(cond.end);
tSkipListDestroyIter(iter);
}
static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TSQL_NODE_EXPR && pRight->nodeType == TSQL_NODE_EXPR) {
if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
if (filterItem(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return filterItem(pRight, pItem, param);
} else { // and
if (!filterItem(pLeft, pItem, param)) {
return false;
}
return filterItem(pRight, pItem, param);
}
}
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
}
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SExprTraverseSupp *param ) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
}
}
tSkipListDestroyIter(iter);
}
static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter);
char * pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
}
} else {
addToResult = filterFp(pNode, pQueryInfo);
}
if (addToResult) {
STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info);
}
}
tSkipListDestroyIter(iter);
}
// post-root order traverse syntax tree
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
return;
}
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
}
return;
}
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//apply the hierarchical expression to every node in skiplist for find the qualified nodes
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
#if 0
/*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
*
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert(pExpr->_node.optr == TSDB_RELATION_AND);
tExprNode *pFirst = NULL;
tExprNode *pSecond = NULL;
if (pLeft->_node.hasPK == 1) {
pFirst = pLeft;
pSecond = pRight;
} else {
pFirst = pRight;
pSecond = pLeft;
}
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
// we filter the result based on the skiplist index in the first place
tExprTreeTraverse(pFirst, pSkipList, result, param);
/*
* recursively perform the filter operation based on the initial results,
* So, we do not set the skip list index as a parameter
*/
tExprTreeTraverse(pSecond, NULL, result, param);
#endif
}
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) {
case TSDB_DATA_TYPE_TINYINT: {
......@@ -430,7 +102,73 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
}
}
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
free(pNode->pSchema);
}
free(pNode);
}
void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
tExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
}
free(*pExpr);
*pExpr = NULL;
}
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TSQL_NODE_EXPR && pRight->nodeType == TSQL_NODE_EXPR) {
if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
if (exprTreeApplayFilter(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return exprTreeApplayFilter(pRight, pItem, param);
} else { // and
if (!exprTreeApplayFilter(pLeft, pItem, param)) {
return false;
}
return exprTreeApplayFilter(pRight, pItem, param);
}
}
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
}
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
if (pExprs == NULL) {
return;
......@@ -442,7 +180,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
/* the left output has result from the left child syntax tree */
char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows);
if (pLeft->nodeType == TSQL_NODE_EXPR) {
tExprTreeCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
arithmeticTreeTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
}
/* the right output has result from the right child syntax tree */
......@@ -450,7 +188,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
char *pdata = malloc(sizeof(int64_t) * numOfRows);
if (pRight->nodeType == TSQL_NODE_EXPR) {
tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
arithmeticTreeTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
}
if (pLeft->nodeType == TSQL_NODE_EXPR) {
......@@ -459,11 +197,11 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
* exprLeft + exprRight
* the type of returned value of one expression is always double float precious
*/
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // exprLeft + columnRight
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr);
// set input buffer
char *pInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
......@@ -475,14 +213,14 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // exprLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr);
fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
}
} else if (pLeft->nodeType == TSQL_NODE_COL) {
// column data specified on left-hand-side
char *pLeftInputData = getSourceDataBlock(param, pLeft->pSchema->name, pLeft->pSchema->colId);
if (pRight->nodeType == TSQL_NODE_EXPR) { // columnLeft + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
......@@ -494,12 +232,12 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
} else if (pRight->nodeType == TSQL_NODE_COL) { // columnLeft + columnRight
// column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr);
// both columns are descending order, do not reverse the source data
fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order);
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // columnLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
......@@ -511,13 +249,13 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
} else {
// column data specified on left-hand-side
if (pRight->nodeType == TSQL_NODE_EXPR) { // 12 + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight
// column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pRightInputData, pRight->pSchema->type, numOfRows);
......@@ -527,7 +265,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // 12 + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, TSDB_ORDER_ASC);
}
}
......
......@@ -1507,7 +1507,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol;
bool groupbyColumnValue = pRuntimeEnv->groupbyColumn;
int16_t type = 0;
int16_t bytes = 0;
......@@ -1689,7 +1689,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo* pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyColumn) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
......@@ -1701,7 +1701,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) {
numOfRes = pResultRowInfo->size;
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo);
} else { // projection query
......@@ -1972,7 +1972,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
// if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table
if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
if (!pRuntimeEnv->groupbyColumn && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv);
}
......@@ -2091,7 +2091,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
}
// Note:top/bottom query is fixed output query
if (pRuntimeEnv->topBotQuery || pRuntimeEnv->groupbyNormalCol) {
if (pRuntimeEnv->topBotQuery || pRuntimeEnv->groupbyColumn) {
return true;
}
......@@ -2732,7 +2732,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery;
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv) && !isTSCompQuery(pQuery)) {
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTSCompQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
......@@ -2956,50 +2956,6 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
}
}
static UNUSED_FUNC void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (!mergeFlag) {
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE;
RESET_RESULT_INFO(pCtx[i].resultInfo);
aAggs[functionId].init(&pCtx[i]);
}
pCtx[i].hasNull = true;
pCtx[i].nStartQueryTimestamp = timestamp;
pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page);
// in case of tag column, the tag information should be extracted from input buffer
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) {
tVariantDestroy(&pCtx[i].tag);
int32_t type = pCtx[i].outputType;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx[i].tag, varDataVal(pCtx[i].aInputElemBuf), varDataLen(pCtx[i].aInputElemBuf), type);
} else {
tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType);
}
}
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
}
aAggs[functionId].distMergeFunc(&pCtx[i]);
}
}
static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t srcDataType) {
if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
switch (srcDataType) {
......@@ -3396,7 +3352,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
......@@ -3584,7 +3540,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
......@@ -3779,10 +3735,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol) {
if (pRuntimeEnv->groupbyColumn) {
closeAllResultRows(pWindowResInfo);
}
......@@ -3836,7 +3792,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
pTableQueryInfo->cur.vgroupIndex = -1;
// set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) {
int32_t initialSize = 128;
int32_t code = initResultRowInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4189,7 +4145,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SResultRowInfo * pResultRowInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyColumn) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
......@@ -4232,7 +4188,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
} else {
// there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
(pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) &&
(pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) &&
(pRuntimeEnv->windowResInfo.size > 0)) {
return true;
}
......@@ -4714,7 +4670,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->groupbyNormalCol = isGroupbyNormalCol(pQuery->pGroupbyExpr);
pRuntimeEnv->groupbyColumn = isGroupbyNormalCol(pQuery->pGroupbyExpr);
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
......@@ -4734,7 +4690,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
int16_t type = TSDB_DATA_TYPE_NULL;
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
if (pRuntimeEnv->groupbyColumn) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_INT; // group id
......@@ -4745,7 +4701,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
return code;
}
}
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
} else if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
int32_t numOfResultRows = getInitialPageNum(pQInfo);
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
......@@ -4754,7 +4710,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
}
int16_t type = TSDB_DATA_TYPE_NULL;
if (pRuntimeEnv->groupbyNormalCol) {
if (pRuntimeEnv->groupbyColumn) {
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_TIMESTAMP;
......@@ -4860,7 +4816,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (!pRuntimeEnv->groupbyNormalCol) {
if (!pRuntimeEnv->groupbyColumn) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
}
......@@ -5118,7 +5074,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break;
}
}
} else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
} else if (pRuntimeEnv->groupbyColumn) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
......@@ -5639,7 +5595,7 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
}
} else {
arithSup.pArithExpr = pExpr;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, TSDB_ORDER_ASC,
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, TSDB_ORDER_ASC,
getArithemicInputSrc);
}
}
......@@ -5662,7 +5618,7 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -5687,7 +5643,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
limitResults(pRuntimeEnv);
}
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -5752,7 +5708,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
TSKEY newStartKey = QUERY_IS_ASC_QUERY(pQuery)? INT64_MIN:INT64_MAX;
// skip blocks without load the actual data block from file if no filter condition present
if (!pRuntimeEnv->groupbyNormalCol) {
if (!pRuntimeEnv->groupbyColumn) {
skipTimeInterval(pRuntimeEnv, &newStartKey);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0 && pRuntimeEnv->pFillInfo == NULL) {
setQueryStatus(pQuery, QUERY_COMPLETED);
......@@ -5849,13 +5805,13 @@ static void tableQueryImpl(SQInfo *pQInfo) {
STableQueryInfo* item = taosArrayGetP(g, 0);
// group by normal column, sliding window query, interval query are handled by interval query processor
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { // interval (down sampling operation)
tableIntervalProcess(pQInfo, item);
} else if (isFixedOutputQuery(pRuntimeEnv)) {
tableFixedOutputProcess(pQInfo, item);
tableAggregationProcess(pQInfo, item);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1);
tableMultiOutputProcess(pQInfo, item);
tableProjectionProcess(pQInfo, item);
}
// record the total elapsed time
......@@ -5871,11 +5827,11 @@ static void stableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyNormalCol))) {
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) {
multiTableQueryProcess(pQInfo);
} else {
assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
pRuntimeEnv->groupbyNormalCol);
pRuntimeEnv->groupbyColumn);
sequentialTableProcess(pQInfo);
}
......
......@@ -2645,13 +2645,12 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
return pTableGroup;
}
static bool indexedNodeFilterFp(const void* pNode, void* param) {
static bool tableFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL;
char* val = NULL;
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable);
} else {
......@@ -2706,15 +2705,17 @@ static bool indexedNodeFilterFp(const void* pNode, void* param) {
return true;
}
static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// query according to the expression tree
SExprTraverseSupp supp = {
.nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
.nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
.setupInfoFn = filterPrepare,
.pExtInfo = pSTable->tagSchema,
};
tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeDestroy(&pExpr, destroyHelper);
return TSDB_CODE_SUCCESS;
}
......@@ -2956,3 +2957,235 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
taosArrayDestroy(pGroupList->pGroupList);
pGroupList->numOfTables = 0;
}
static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
// Scan each node in the skiplist by using iterator
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (exprTreeApplayFilter(pExpr, pNode, param)) {
taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
}
}
tSkipListDestroyIter(iter);
}
typedef struct {
char* v;
int32_t optr;
} SEndPoint;
typedef struct {
SEndPoint* start;
SEndPoint* end;
} SQueryCond;
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
pCond->start = calloc(1, sizeof(SEndPoint));
pCond->start->optr = queryColInfo->optr;
pCond->start->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
pCond->end = calloc(1, sizeof(SEndPoint));
pCond->end->optr = queryColInfo->optr;
pCond->end->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_IN || optr == TSDB_RELATION_LIKE) {
assert(0);
}
return TSDB_CODE_SUCCESS;
}
static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = NULL;
SQueryCond cond = {0};
if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
//todo handle error
}
if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
}
if (cond.start != NULL) {
int32_t optr = cond.start->optr;
if (optr == TSDB_RELATION_EQUAL) { // equals
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
if (ret != 0) {
break;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
bool comp = true;
int32_t ret = 0;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue;
} else {
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false;
}
}
} else if (optr == TSDB_RELATION_NOT_EQUAL) { // not equal
bool comp = true;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
tSkipListDestroyIter(iter);
comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else {
assert(0);
}
} else {
int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
bool comp = true;
int32_t ret = 0;
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue;
} else {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false; // no need to compare anymore
}
}
} else {
assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
}
}
}
free(cond.start);
free(cond.end);
tSkipListDestroyIter(iter);
}
static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter);
char *pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
}
} else {
addToResult = filterFp(pNode, pQueryInfo);
}
if (addToResult) {
STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info);
}
}
tSkipListDestroyIter(iter);
}
// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
return;
}
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
queryIndexedColumn(pSkipList, pQueryInfo, result);
} else {
queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
}
return;
}
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册