提交 f17bf6d7 编写于 作者: H Haojun Liao

[td-3941]<enhance>: refactor having operator.

上级 d497b525
......@@ -113,13 +113,13 @@ typedef struct SExprFilter {
SExprInfo *pExprInfo;
SArray *fp;
SColumn *pFilters; //having filter info
}SExprFilter;
} SExprFilter;
typedef struct SInternalField {
TAOS_FIELD field;
bool visible;
SExprInfo *pExpr;
SExprFilter *pFieldFilters;
// SExprFilter *pFieldFilters;
} SInternalField;
typedef struct SFieldInfo {
......@@ -128,7 +128,6 @@ typedef struct SFieldInfo {
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
......@@ -246,7 +245,7 @@ typedef struct SQueryInfo {
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
int32_t havingFieldNum;
} SQueryInfo;
typedef struct {
......
......@@ -102,32 +102,8 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem
}
pCtx[i].interBufBytes = pExpr->base.interBytes;
// pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo));
pCtx[i].stableQuery = true;
}
// int16_t n = 0;
// int16_t tagLen = 0;
// SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
//
// SQLFunctionCtx *pCtx1 = NULL;
// for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
// if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
// tagLen += pExpr->base.resBytes;
// pTagCtx[n++] = &pCtx[i];
// } else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
// pCtx1 = &pCtx[i];
// }
// }
//
// if (n == 0 || pCtx == NULL) {
// free(pTagCtx);
// } else {
// pCtx1->tagInfo.pTagCtxList = pTagCtx;
// pCtx1->tagInfo.numOfTagCols = n;
// pCtx1->tagInfo.tagsLen = tagLen;
// }
}
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
......@@ -892,6 +868,7 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index,
if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, size, index, buf, TSDB_ORDER_ASC);
}
// if ret == 0, means the result belongs to the same group
return (ret == 0);
}
......
......@@ -1988,7 +1988,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if ((pToken->z == NULL || pToken->n == 0)
&& (TK_INTEGER != sqlOptr)) /*select count(1) from table*/ {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
if (sqlOptr == TK_ALL) {
// select table.*
// check if the table name is valid or not
......@@ -3067,23 +3068,19 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
}
static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
if (pColumn == NULL) {
return NULL;
}
static SColumnFilterInfo* addColumnFilterInfo(SColumnFilterList* filterList) {
int32_t size = (filterList->numOfFilters) + 1;
int32_t size = pColumn->info.numOfFilters + 1;
char* tmp = (char*) realloc((void*)(pColumn->info.filterInfo), sizeof(SColumnFilterInfo) * (size));
char* tmp = (char*) realloc((void*)(filterList->filterInfo), sizeof(SColumnFilterInfo) * (size));
if (tmp != NULL) {
pColumn->info.filterInfo = (SColumnFilterInfo*)tmp;
filterList->filterInfo = (SColumnFilterInfo*)tmp;
} else {
return NULL;
}
pColumn->info.numOfFilters++;
filterList->numOfFilters = size;
SColumnFilterInfo* pColFilterInfo = &pColumn->info.filterInfo[pColumn->info.numOfFilters - 1];
SColumnFilterInfo* pColFilterInfo = &(filterList->filterInfo[size - 1]);
memset(pColFilterInfo, 0, sizeof(SColumnFilterInfo));
return pColFilterInfo;
......@@ -3254,10 +3251,10 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
*/
if (sqlOptr == TK_AND) {
// this is a new filter condition on this column
if (pColumn->info.numOfFilters == 0) {
pColFilter = addColumnFilterInfo(pColumn);
if (pColumn->info.flist.numOfFilters == 0) {
pColFilter = addColumnFilterInfo(&pColumn->info.flist);
} else { // update the existed column filter information, find the filter info here
pColFilter = &pColumn->info.filterInfo[0];
pColFilter = &pColumn->info.flist.filterInfo[0];
}
if (pColFilter == NULL) {
......@@ -3265,7 +3262,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
}
} else if (sqlOptr == TK_OR) {
// TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2"
pColFilter = addColumnFilterInfo(pColumn);
pColFilter = addColumnFilterInfo(&pColumn->info.flist);
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -4099,8 +4096,8 @@ static bool validateFilterExpr(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGetP(pColList, i);
for (int32_t j = 0; j < pCol->info.numOfFilters; ++j) {
SColumnFilterInfo* pColFilter = &pCol->info.filterInfo[j];
for (int32_t j = 0; j < pCol->info.flist.numOfFilters; ++j) {
SColumnFilterInfo* pColFilter = &pCol->info.flist.filterInfo[j];
int32_t lowerOptr = pColFilter->lowerRelOptr;
int32_t upperOptr = pColFilter->upperRelOptr;
......@@ -6794,142 +6791,98 @@ int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
int32_t tscInsertExprFields(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** interField) {
tSqlExprItem item = {.pNode = pExpr, .aliasName = NULL, .distinct = false};
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
// ADD TRUE FOR TEST
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
++pQueryInfo->havingFieldNum;
size_t n = tscSqlExprNumOfExprs(pQueryInfo);
SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, (int32_t)n - 1);
int32_t slot = tscNumOfFields(pQueryInfo) - 1;
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot);
pInfo->visible = false;
if (pInfo->pFieldFilters == NULL) {
SExprFilter* pFieldFilters = calloc(1, sizeof(SExprFilter));
if (pFieldFilters == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SColumn* pFilters = calloc(1, sizeof(SColumn));
if (pFilters == NULL) {
tfree(pFieldFilters);
// TODO normalize the function expression and compare it
int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pSqlExpr,
SExprInfo** pExpr) {
*pExpr = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pSelectNodeList);
for(int32_t i = 0; i < num; ++i) {
tSqlExprItem* pItem = taosArrayGet(pSelectNodeList, i);
if (tSqlExprCompare(pItem->pNode, pSqlExpr) == 0) { // exists, not added it,
pFieldFilters->pFilters = pFilters;
pFieldFilters->pExprInfo = pExprInfo;
pExprInfo->base.pFilter = pFilters->info.filterInfo;
pInfo->pFieldFilters = pFieldFilters;
}
int32_t functionId = isValidFunction(pSqlExpr->operand.z, pSqlExpr->operand.n);
tSqlExprItem* pParamElem = taosArrayGet(pSqlExpr->pParam, 0);
SStrToken* pToken = &pParamElem->pNode->colInfo;
pInfo->pFieldFilters->pExpr = pExpr;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
getColumnIndexByName(pCmd, pToken, pQueryInfo, &index);
*interField = pInfo;
size_t numOfNodeInSel = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t k = 0; k < numOfNodeInSel; ++k) {
SExprInfo* pExpr1 = tscSqlExprGet(pQueryInfo, k);
return TSDB_CODE_SUCCESS;
}
if (pExpr1->base.functionId != functionId) {
continue;
}
int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** pField) {
SInternalField* pInfo = NULL;
if (pExpr1->base.colInfo.colIndex != index.columnIndex) {
continue;
}
for (int32_t i = pQueryInfo->havingFieldNum - 1; i >= 0; --i) {
pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutput - 1 - i);
++pQueryInfo->havingFieldNum;
*pExpr = pExpr1;
break;
}
if (pInfo->pFieldFilters && 0 == tSqlExprCompare(pInfo->pFieldFilters->pExpr, pExpr)) {
*pField = pInfo;
assert(*pExpr != NULL);
return TSDB_CODE_SUCCESS;
}
}
int32_t ret = tscInsertExprFields(pCmd, pQueryInfo, pExpr, &pInfo);
if (ret) {
return ret;
}
*pField = pInfo;
tSqlExprItem item = {.pNode = pSqlExpr, .aliasName = NULL, .distinct = false};
return TSDB_CODE_SUCCESS;
}
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
static int32_t genExprFilter(SExprFilter* exprFilter) {
exprFilter->fp = taosArrayInit(4, sizeof(__filter_func_t));
if (exprFilter->fp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
// ADD TRUE FOR TEST
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
for (int32_t i = 0; i < exprFilter->pFilters->info.numOfFilters; ++i) {
SColumnFilterInfo *filterInfo = &exprFilter->pFilters->info.filterInfo[i];
int32_t lower = filterInfo->lowerRelOptr;
int32_t upper = filterInfo->upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
tscError("invalid rel optr");
return TSDB_CODE_TSC_APP_ERROR;
}
__filter_func_t ffp = getFilterOperator(lower, upper);
if (ffp == NULL) {
tscError("invalid filter info");
return TSDB_CODE_TSC_APP_ERROR;
}
taosArrayPush(exprFilter->fp, &ffp);
}
++pQueryInfo->havingFieldNum;
size_t n = tscSqlExprNumOfExprs(pQueryInfo);
*pExpr = tscSqlExprGet(pQueryInfo, (int32_t)n - 1);
return TSDB_CODE_SUCCESS;
}
static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t sqlOptr) {
static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pExpr, int32_t sqlOptr) {
const char* msg1 = "non binary column not support like operator";
const char* msg2 = "invalid operator for binary column in having clause";
const char* msg3 = "invalid operator for bool column in having clause";
SColumn* pColumn = NULL;
SColumnFilterInfo* pColFilter = NULL;
SInternalField* pInfo = NULL;
/*
* in case of TK_AND filter condition, we first find the corresponding column and build the query condition together
* the already existed condition.
*/
SExprInfo *expr = NULL;
if (sqlOptr == TK_AND) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, &expr);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// this is a new filter condition on this column
if (pColumn->info.numOfFilters == 0) {
pColFilter = addColumnFilterInfo(pColumn);
if (expr->base.flist.numOfFilters == 0) {
pColFilter = addColumnFilterInfo(&expr->base.flist);
} else { // update the existed column filter information, find the filter info here
pColFilter = &pColumn->info.filterInfo[0];
pColFilter = &expr->base.flist.filterInfo[0];
}
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else if (sqlOptr == TK_OR) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, &expr);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2"
pColFilter = addColumnFilterInfo(pColumn);
// TODO refactor
pColFilter = addColumnFilterInfo(&expr->base.flist);
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -6938,7 +6891,7 @@ static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
}
pColFilter->filterstr =
((pInfo->field.type == TSDB_DATA_TYPE_BINARY || pInfo->field.type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
((expr->base.resType == TSDB_DATA_TYPE_BINARY || expr->base.resType == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
if (pColFilter->filterstr) {
if (pExpr->tokenId != TK_EQ
......@@ -6954,22 +6907,23 @@ static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pInfo->field.type == TSDB_DATA_TYPE_BOOL) {
if (expr->base.resType == TSDB_DATA_TYPE_BOOL) {
if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
}
int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pInfo->field.type, pExpr);
int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, expr->base.resType, pExpr);
if (ret) {
return ret;
}
return genExprFilter(pInfo->pFieldFilters);
return TSDB_CODE_SUCCESS;
// return genExprFilter(pInfo->pFieldFilters);
}
int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t parentOptr) {
int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pExpr, int32_t parentOptr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -6980,12 +6934,12 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in
tSqlExpr* pRight = pExpr->pRight;
if (pExpr->tokenId == TK_AND || pExpr->tokenId == TK_OR) {
int32_t ret = getHavingExpr(pCmd, pQueryInfo, pExpr->pLeft, pExpr->tokenId);
int32_t ret = getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, pExpr->tokenId);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return getHavingExpr(pCmd, pQueryInfo, pExpr->pRight, pExpr->tokenId);
return getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr->pRight, pExpr->tokenId);
}
if (pLeft == NULL || pRight == NULL) {
......@@ -6998,7 +6952,7 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in
exchangeExpr(pExpr);
pLeft = pExpr->pLeft;
pLeft = pExpr->pLeft;
pRight = pExpr->pRight;
......@@ -7014,29 +6968,27 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
//if (pLeft->pParam == NULL || pLeft->pParam->nExpr < 1) {
// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
//}
if (pLeft->pParam) {
size_t size = taosArrayGetSize(pLeft->pParam);
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pParamElem = taosArrayGet(pLeft->pParam, i);
if (pParamElem->pNode->tokenId != TK_ALL &&
pParamElem->pNode->tokenId != TK_ID &&
pParamElem->pNode->tokenId != TK_STRING &&
pParamElem->pNode->tokenId != TK_INTEGER &&
pParamElem->pNode->tokenId != TK_FLOAT) {
tSqlExprItem* pParamItem = taosArrayGet(pLeft->pParam, i);
tSqlExpr* pExpr1 = pParamItem->pNode;
if (pExpr1->tokenId != TK_ALL &&
pExpr1->tokenId != TK_ID &&
pExpr1->tokenId != TK_STRING &&
pExpr1->tokenId != TK_INTEGER &&
pExpr1->tokenId != TK_FLOAT) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID && (pParamElem->pNode->colInfo.z == NULL && pParamElem->pNode->colInfo.n == 0)) {
if (pExpr1->tokenId == TK_ID && (pExpr1->colInfo.z == NULL && pExpr1->colInfo.n == 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID) {
if (pExpr1->tokenId == TK_ID) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if ((getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) {
if ((getColumnIndexByName(pCmd, &pExpr1->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -7056,10 +7008,11 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return handleExprInHavingClause(pCmd, pQueryInfo, pExpr, parentOptr);
return handleExprInHavingClause(pCmd, pQueryInfo, pSelectNodeList, pExpr, parentOptr);
}
int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, bool isSTable, int32_t joinQuery, int32_t timeWindowQuery) {
int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, SArray* pSelectNodeList,
int32_t joinQuery, int32_t timeWindowQuery) {
const char* msg1 = "having only works with group by";
const char* msg2 = "functions or others can not be mixed up";
const char* msg3 = "invalid expression in having clause";
......@@ -7082,7 +7035,7 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p
int32_t ret = 0;
if ((ret = getHavingExpr(pCmd, pQueryInfo, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) {
if ((ret = getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -7328,7 +7281,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
}
// parse the having clause in the first place
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, isSTable, joinQuery, timeWindowQuery) !=
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -7515,7 +7468,7 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pCol->info.numOfFilters > 0) {
if (pCol->info.flist.numOfFilters > 0) {
return true;
}
}
......
......@@ -728,6 +728,38 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
return pMsg;
}
// TODO refactor
static int32_t serializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) {
// append the filter information after the basic column information
for (int32_t f = 0; f < numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pColFilters[f];
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg);
pFilterMsg->filterstr = htons(pColFilter->filterstr);
(*pMsg) += sizeof(SColumnFilterInfo);
if (pColFilter->filterstr) {
pFilterMsg->len = htobe64(pColFilter->len);
memcpy(*pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
(*pMsg) += (pColFilter->len + 1); // append the additional filter binary info
} else {
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
tscError("invalid filter info");
return TSDB_CODE_TSC_INVALID_SQL;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, char** pMsg, void* addr) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -760,6 +792,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pSqlExpr->flist.numOfFilters = htons(pExpr->flist.numOfFilters);
(*pMsg) += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
......@@ -774,6 +807,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
}
}
serializeColFilterInfo(pExpr->flist.filterInfo, pExpr->flist.numOfFilters, pMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -864,34 +899,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tableCols[i].colId = htons(pCol->colId);
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
pQueryMsg->tableCols[i].type = htons(pCol->type);
pQueryMsg->tableCols[i].numOfFilters = htons(pCol->numOfFilters);
pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
// append the filter information after the basic column information
for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
pFilterMsg->filterstr = htons(pColFilter->filterstr);
pMsg += sizeof(SColumnFilterInfo);
if (pColFilter->filterstr) {
pFilterMsg->len = htobe64(pColFilter->len);
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
pMsg += (pColFilter->len + 1); // append the additional filter binary info
} else {
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
tscError("invalid filter info");
return TSDB_CODE_TSC_INVALID_SQL;
}
}
serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
}
for (int32_t i = 0; i < query.numOfOutput; ++i) {
......@@ -953,7 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pTagCol->colId = htons(pTag->colId);
pTagCol->bytes = htons(pTag->bytes);
pTagCol->type = htons(pTag->type);
pTagCol->numOfFilters = 0;
pTagCol->flist.numOfFilters = 0;
pMsg += sizeof(SColumnInfo);
}
......
......@@ -842,7 +842,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->info.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pQueryInfo->colList, &p);
}
......@@ -1939,7 +1939,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->info.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
......@@ -3490,8 +3490,8 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
SQueryAttr *pQueryAttr = &pQInfo->query;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = &pQInfo->query;
pRuntimeEnv->pQueryAttr = pQueryAttr;
tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, NULL);
......
......@@ -1473,14 +1473,15 @@ int32_t tscGetResRowLength(SArray* pExprList) {
return size;
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
tfree(pFilterInfo[i].pz);
static void destroyFilterInfo(SColumnFilterList* pFilterList) {
for(int32_t i = 0; i < pFilterList->numOfFilters; ++i) {
if (pFilterList->filterInfo[i].filterstr) {
tfree(pFilterList->filterInfo[i].pz);
}
}
tfree(pFilterInfo);
tfree(pFilterList->filterInfo);
pFilterList->numOfFilters = 0;
}
void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
......@@ -1714,6 +1715,9 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL);
*dst = *src;
dst->base.flist.filterInfo = calloc(src->base.flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(dst->base.flist.filterInfo, src->base.flist.filterInfo, sizeof(SColumnFilterInfo) * src->base.flist.numOfFilters);
dst->pExpr = exprdup(src->pExpr);
memset(dst->base.param, 0, sizeof(tVariant) * tListLen(dst->base.param));
......@@ -1789,8 +1793,8 @@ SColumn* tscColumnClone(const SColumn* src) {
dst->columnIndex = src->columnIndex;
dst->tableUid = src->tableUid;
dst->info.numOfFilters = src->info.numOfFilters;
dst->info.filterInfo = tFilterInfoDup(src->info.filterInfo, src->info.numOfFilters);
dst->info.flist.numOfFilters = src->info.flist.numOfFilters;
dst->info.flist.filterInfo = tFilterInfoDup(src->info.flist.filterInfo, src->info.flist.numOfFilters);
dst->info.type = src->info.type;
dst->info.colId = src->info.colId;
dst->info.bytes = src->info.bytes;
......@@ -1798,7 +1802,7 @@ SColumn* tscColumnClone(const SColumn* src) {
}
static void tscColumnDestroy(SColumn* pCol) {
destroyFilterInfo(pCol->info.filterInfo, pCol->info.numOfFilters);
destroyFilterInfo(&pCol->info.flist);
free(pCol);
}
......@@ -3427,28 +3431,12 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
SExprInfo* pExpr = &pQueryAttr->pExpr1[i];
SSqlExpr* pse = &pQueryAttr->pExpr3[i].base;
memcpy(pse->aliasName, pExpr->base.aliasName, tListLen(pse->aliasName));
pse->uid = pExpr->base.uid;
pse->functionId = pExpr->base.functionId;
pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes;
pse->interBytes = pExpr->base.interBytes;
pse->resColId = pExpr->base.resColId;
pse->offset = pExpr->base.offset;
pse->numOfParams = pExpr->base.numOfParams;
pse->colInfo = pExpr->base.colInfo;
tscSqlExprAssign(&pQueryAttr->pExpr3[i], pExpr);
pse->colInfo.colId = pExpr->base.resColId;
pse->colInfo.colIndex = i;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->colInfo.flag = pExpr->base.colInfo.flag;
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
}
}
{
......@@ -3515,7 +3503,7 @@ static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
pTagCol->colId = pColSchema->colId;
pTagCol->bytes = pColSchema->bytes;
pTagCol->type = pColSchema->type;
pTagCol->numOfFilters = 0;
pTagCol->flist.numOfFilters = 0;
}
return TSDB_CODE_SUCCESS;
......@@ -3546,6 +3534,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
......@@ -3587,7 +3576,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
}
pQueryAttr->tableCols[i] = pCol->info;
pQueryAttr->tableCols[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->tableCols[i].numOfFilters);
pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pCol->info.flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters);
}
// global aggregate query
......
......@@ -62,8 +62,7 @@ typedef struct SSqlExpr {
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
int32_t filterNum;
SColumnFilterInfo *pFilter;
SColumnFilterList flist;
} SSqlExpr;
typedef struct SExprInfo {
......
......@@ -68,6 +68,7 @@ bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
......
......@@ -420,6 +420,13 @@ typedef struct SColumnFilterInfo {
};
} SColumnFilterInfo;
typedef struct SColumnFilterList {
int16_t numOfFilters;
union{
int64_t placeholder;
SColumnFilterInfo *filterInfo;
};
} SColumnFilterList;
/*
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.
......@@ -428,11 +435,7 @@ typedef struct SColumnInfo {
int16_t colId;
int16_t type;
int16_t bytes;
int16_t numOfFilters;
union{
int64_t placeholder;
SColumnFilterInfo *filterInfo;
};
SColumnFilterList flist;
} SColumnInfo;
typedef struct STableIdInfo {
......
......@@ -300,7 +300,7 @@ enum OPERATOR_TYPE_E {
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwaySort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Having = 19,
OP_Condition = 19,
};
typedef struct SOperatorInfo {
......@@ -440,10 +440,10 @@ typedef struct SSLimitOperatorInfo {
SArray *orderColumnList;
} SSLimitOperatorInfo;
typedef struct SHavingOperatorInfo {
SArray* fp;
} SHavingOperatorInfo;
typedef struct SConditionOperatorInfo {
SSingleColumnFilterInfo *pFilterInfo;
int32_t numOfFilterCols;
} SConditionOperatorInfo;
typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
......@@ -507,7 +507,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx
int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
此差异已折叠。
......@@ -86,6 +86,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_Groupby;
taosArrayPush(plan, &op);
if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) {
op = OP_Condition;
taosArrayPush(plan, &op);
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
......@@ -107,6 +112,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) {
op = OP_Condition;
taosArrayPush(plan, &op);
}
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
......@@ -139,6 +149,11 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
op = OP_GlobalAggregate;
taosArrayPush(plan, &op);
if (pQueryAttr->havingNum > 0) {
op = OP_Condition;
taosArrayPush(plan, &op);
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
......
......@@ -349,7 +349,6 @@ int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right) {
return 1;
}
if (right->pParam && left->pParam) {
size_t size = taosArrayGetSize(right->pParam);
if (left->pParam && taosArrayGetSize(left->pParam) != size) {
......
......@@ -186,7 +186,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
SColumnInfo* column = pQueryMsg->tableCols + i;
freeColumnFilterInfo(column->filterInfo, column->numOfFilters);
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册