提交 94b2e2dd 编写于 作者: H Haojun Liao

[td-10564] Fix bug in generating sql plan.

上级 addb829f
......@@ -86,10 +86,8 @@ typedef struct SOrder {
} SOrder;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
bool groupbyTag; // group by tag or column
} SGroupbyExpr;
// the structure for sql function in select clause
......
......@@ -179,6 +179,7 @@ int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deep
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
......
......@@ -7851,9 +7851,6 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
return NULL;
}
pGroupbyExpr->orderType = pQueryMsg->orderType;
pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
pGroupbyExpr->columnInfo = taosArrayInit(pQueryMsg->numOfGroupCols, sizeof(SColIndex));
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]);
......
......@@ -37,7 +37,6 @@ extern "C" {
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
TAOS_FIELD createField(const SSchema* pSchema);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
......@@ -54,7 +53,7 @@ STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
void columnListCopyAll(SArray* dst, const SArray* src);
SColumn* columnListInsert(SArray* pColumnList, uint64_t uid, SSchema* pSchema, int32_t flag);
SColumn* insertPrimaryTsColumn(SArray* pColumnList, uint64_t tableUid);
SColumn* insertPrimaryTsColumn(SArray* pColumnList, const char* colName, uint64_t tableUid);
void cleanupTagCond(STagCond* pTagCond);
void cleanupColumnCond(SArray** pCond);
......
......@@ -547,29 +547,20 @@ int32_t validateGroupbyNode(SQueryStmtInfo* pQueryInfo, SArray* pList, SMsgBuf*
groupbyTag = true;
int32_t relIndex = index.columnIndex;
if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
relIndex -= getNumOfColumns(pTableMeta);
}
SColIndex colIndex = { .colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId, };
strncpy(colIndex.name, pSchema->name, tListLen(colIndex.name));
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
SColumn c = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, TSDB_COL_TAG, pSchema);
taosArrayPush(pGroupExpr->columnInfo, &c);
index.columnIndex = relIndex;
columnListInsert(pTableMetaInfo->tagColList, pTableMeta->uid, pSchema, colIndex.flag);
columnListInsert(pTableMetaInfo->tagColList, pTableMeta->uid, pSchema, TSDB_COL_TAG);
} else {
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
if (pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return buildInvalidOperationMsg(pMsgBuf, msg5);
}
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
SColumn c = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, TSDB_COL_NORMAL, pSchema);
taosArrayPush(pGroupExpr->columnInfo, &c);
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
strncpy(colIndex.name, pSchema->name, tListLen(colIndex.name));
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
numOfGroupbyCols++;
pQueryInfo->info.groupbyColumn = true;
......@@ -589,8 +580,7 @@ int32_t validateGroupbyNode(SQueryStmtInfo* pQueryInfo, SArray* pList, SMsgBuf*
}
}
pGroupExpr->orderType = TSDB_ORDER_ASC;
pGroupExpr->tableIndex = tableIndex;
pGroupExpr->groupbyTag = groupbyTag;
return TSDB_CODE_SUCCESS;
}
......@@ -860,7 +850,6 @@ int32_t validateStateWindowNode(SQueryStmtInfo *pQueryInfo, SWindowStateVal* pWi
//TODO use group by routine? state window query not support stable query.
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pGroupExpr->orderType = TSDB_ORDER_ASC;
pQueryInfo->info.stateWindow = true;
return TSDB_CODE_SUCCESS;
......@@ -1732,7 +1721,8 @@ SExprInfo* doAddOneExprInfo(SQueryStmtInfo* pQueryInfo, const char* funcName, SS
}
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
insertPrimaryTsColumn(pQueryInfo->colList, uid);
char* colName = pTableMetaInfo->pTableMeta->schema[0].name;
insertPrimaryTsColumn(pQueryInfo->colList, colName, uid);
}
}
......@@ -2809,7 +2799,7 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
// add the primary timestamp column even though it is not required by user
STableMeta* pTableMeta = getMetaInfo(pQueryInfo, index.tableIndex)->pTableMeta;
if (pTableMeta->tableType != TSDB_TEMP_TABLE) {
insertPrimaryTsColumn(pQueryInfo->colList, pTableMeta->uid);
insertPrimaryTsColumn(pQueryInfo->colList, pTableMeta->schema[0].name, pTableMeta->uid);
}
} else if (tokenId == TK_STRING || tokenId == TK_INTEGER || tokenId == TK_FLOAT) { //constant value column
SColumnIndex index = createConstantColumnIndex(&pQueryInfo->udColumnId);
......@@ -2848,7 +2838,8 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
// add the primary timestamp column even though it is not required by user
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex);
if (!UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
insertPrimaryTsColumn(pQueryInfo->colList, pTableMetaInfo->pTableMeta->uid);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
insertPrimaryTsColumn(pQueryInfo->colList, pTableMeta->schema[0].name, pTableMeta->uid);
}
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -2967,7 +2958,143 @@ static uint64_t findTmpSourceColumnInNextLevel(SQueryStmtInfo* pQueryInfo, tExpr
return uid;
}
int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStmtInfo* pQueryInfo, SArray* pCols, bool* keepTableCols, SMsgBuf* pMsgBuf) {
static tExprNode* doCreateColumnNode(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, bool keepTableCols, SArray* pCols) {
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
tExprNode* pExpr = calloc(1, sizeof(tExprNode));
pExpr->nodeType = TEXPR_COL_NODE;
pExpr->pSchema = calloc(1, sizeof(SSchema));
SSchema* pSchema = getOneColumnSchema(pTableMeta, pIndex->columnIndex);
*(SSchema*)(pExpr->pSchema) = *pSchema;
if (keepTableCols) {
SColumn c = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, pIndex->type, pExpr->pSchema);
taosArrayPush(pCols, &c);
}
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
SSchema* pTsSchema = getOneColumnSchema(pTableMeta, 0);
insertPrimaryTsColumn(pQueryInfo->colList, pTsSchema->name, pTableMeta->uid);
return pExpr;
}
static int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SMsgBuf* pMsgBuf);
static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode** p, SArray* pCols,
bool* keepTableCols, const tSqlExpr* pSqlExpr, SMsgBuf* pMsgBuf) {
SArray* pParamList = pSqlExpr->Expr.paramList;
if (pParamList != NULL) {
*num = taosArrayGetSize(pParamList);
p = calloc((*num), POINTER_BYTES);
for (int32_t i = 0; i < (*num); ++i) {
tSqlExprItem* pItem = taosArrayGet(pParamList, i);
int32_t ret = validateSqlExpr(pItem->pNode, pQueryInfo, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t code = sqlExprToExprNode(&p[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // handle the case: count(*) + 22
if (strncasecmp(pSqlExpr->Expr.operand.z, "count", pSqlExpr->Expr.operand.n) != 0) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression");
}
*num = 1;
p = calloc(*num, POINTER_BYTES);
SColumnIndex index = {.type = TSDB_COL_NORMAL, .tableIndex = 0, .columnIndex = 0};
p[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doValidateExpr(SQueryStmtInfo *pQueryInfo, tSqlExpr* pFuncNode, tSqlExpr* pTableColumnNode, SMsgBuf* pMsgBuf) {
char token[FUNCTIONS_NAME_MAX_LENGTH] = {0};
strncpy(token, pFuncNode->Expr.operand.z, pFuncNode->Expr.operand.n);
bool isAgg = qIsAggregateFunction(token);
// count(*) + column is a invalid expression.
if (isAgg) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression");
}
return TSDB_CODE_SUCCESS;
}
int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SMsgBuf* pMsgBuf) {
assert(pSqlExpr);
if (pSqlExpr->type == SQL_NODE_EXPR) {
int32_t valid = validateSqlExpr(pSqlExpr->pLeft, pQueryInfo, pMsgBuf);
if (valid != TSDB_CODE_SUCCESS) {
return valid;
}
valid = validateSqlExpr(pSqlExpr->pRight, pQueryInfo, pMsgBuf);
if (valid != TSDB_CODE_SUCCESS) {
return valid;
}
tSqlExpr* pLeft = pSqlExpr->pLeft, *pRight = pSqlExpr->pRight;
if (pLeft->type == SQL_NODE_SQLFUNCTION && pRight->type == SQL_NODE_SQLFUNCTION) {
char token[FUNCTIONS_NAME_MAX_LENGTH] = {0};
tstrncpy(token, pLeft->Expr.operand.z, pLeft->Expr.operand.n);
bool agg1 = qIsAggregateFunction(token);
tstrncpy(token, pRight->Expr.operand.z, pRight->Expr.operand.n);
bool agg2 = qIsAggregateFunction(token);
if (agg1 != agg2) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression");
}
}
if (pLeft->type == SQL_NODE_SQLFUNCTION && pRight->type == SQL_NODE_TABLE_COLUMN) {
int32_t code = doValidateExpr(pQueryInfo, pLeft, pRight, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (pRight->type == SQL_NODE_SQLFUNCTION && pLeft->type == SQL_NODE_TABLE_COLUMN) {
int32_t code = doValidateExpr(pQueryInfo, pRight, pLeft, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t tokenId = pSqlExpr->tokenId;
if (pRight->type == SQL_NODE_VALUE && pRight->value.nType == TSDB_DATA_TYPE_DOUBLE && pRight->value.d == 0 && tokenId == TK_DIVIDE) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression (divided by 0)");
}
if (tokenId == TK_DIVIDE || tokenId == TK_TIMES || tokenId == TK_MINUS || tokenId == TK_PLUS || tokenId == TK_MODULES) {
if ((pRight->type == SQL_NODE_VALUE && pRight->value.nType == TSDB_DATA_TYPE_BINARY) ||
(pLeft->type == SQL_NODE_VALUE && pLeft->value.nType == TSDB_DATA_TYPE_BINARY)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression (string in arithmetic expression)");
}
}
} else if (pSqlExpr->type == SQL_NODE_TABLE_COLUMN) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
int32_t ret = getColumnIndexByName(&pSqlExpr->columnName, pQueryInfo, &index, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStmtInfo* pQueryInfo, SArray* pCols, bool* keepTableCols, SMsgBuf* pMsgBuf) {
tExprNode* pLeft = NULL;
tExprNode* pRight= NULL;
......@@ -2993,60 +3120,49 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQuerySt
return TSDB_CODE_SUCCESS;
}
} else if (pSqlExpr->type == SQL_NODE_SQLFUNCTION) {
SArray* pParamList = pSqlExpr->Expr.paramList;
if (pParamList != NULL && taosArrayGetSize(pParamList) > 0) {
bool scalar = false;
int32_t functionId = qIsBuiltinFunction(pSqlExpr->Expr.operand.z, pSqlExpr->Expr.operand.n, &scalar);
if (functionId < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (!scalar) {
pQueryInfo->exprListLevelIndex += 1;
}
*keepTableCols = false;
size_t num = taosArrayGetSize(pParamList);
tExprNode** p = calloc(num, POINTER_BYTES);
bool scalar = false;
int32_t functionId = qIsBuiltinFunction(pSqlExpr->Expr.operand.z, pSqlExpr->Expr.operand.n, &scalar);
if (functionId < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (!scalar) {
pQueryInfo->exprListLevelIndex += 1;
}
for(int32_t i = 0; i < num; ++i) {
tSqlExprItem* pItem = taosArrayGet(pParamList, i);
int32_t code = sqlExprToExprNode(&p[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
*keepTableCols = false;
pQueryInfo->exprListLevelIndex -= 1;
int32_t outputIndex = (int32_t)getNumOfExprs(pQueryInfo);
int32_t num = 0;
tExprNode** p = NULL;
int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, p, pCols, keepTableCols, pSqlExpr, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (scalar) {
printf("scalar function found! %s\n", pSqlExpr->exprToken.z);
int32_t outputIndex = (int32_t)getNumOfExprs(pQueryInfo);
// Expression on the results of aggregation functions
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TEXPR_FUNCTION_NODE;
if (scalar) {
printf("scalar function found! %s\n", pSqlExpr->exprToken.z);
(*pExpr)->_function.pChild = p;
(*pExpr)->_function.num = num;
strncpy((*pExpr)->_function.functionName, pSqlExpr->Expr.operand.z, pSqlExpr->Expr.operand.n);
return TSDB_CODE_SUCCESS;
} else {
printf("agg function found, %s\n", pSqlExpr->exprToken.z);
tSqlExprItem item = {.pNode = (tSqlExpr*)pSqlExpr, .aliasName = NULL, .functionId = functionId};
if (addAggExprAndResColumn(pQueryInfo, outputIndex, &item, false, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// Expression on the results of aggregation functions
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TEXPR_FUNCTION_NODE;
pQueryInfo->exprListLevelIndex -= 1;
// convert the aggregate function to be the input data columns for the outer function.
(*pExpr)->_function.pChild = p;
(*pExpr)->_function.num = num;
strncpy((*pExpr)->_function.functionName, pSqlExpr->Expr.operand.z, pSqlExpr->Expr.operand.n);
return TSDB_CODE_SUCCESS;
} else {
printf("agg function found, %s\n", pSqlExpr->exprToken.z);
tSqlExprItem item = {.pNode = (tSqlExpr*)pSqlExpr, .aliasName = NULL, .functionId = functionId};
if (addAggExprAndResColumn(pQueryInfo, outputIndex, &item, false, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
pQueryInfo->exprListLevelIndex -= 1;
// convert the aggregate function to be the input data columns for the outer function.
}
}
}
if (pSqlExpr->pLeft == NULL) { // it is the leaf node
assert(pSqlExpr->pRight == NULL);
......@@ -3089,23 +3205,7 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQuerySt
return ret;
}
pQueryInfo->curTableIdx = index.tableIndex;
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TEXPR_COL_NODE;
(*pExpr)->pSchema = calloc(1, sizeof(SSchema));
SSchema* pSchema = getOneColumnSchema(pTableMeta, index.columnIndex);
*(*pExpr)->pSchema = *pSchema;
if (*keepTableCols) {
SColumn c = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, index.type, (*pExpr)->pSchema);
taosArrayPush(pCols, &c);
}
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
*pExpr = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
return TSDB_CODE_SUCCESS;
} else if (pSqlExpr->tokenId == TK_SET) {
int32_t colType = -1;
......@@ -3141,7 +3241,6 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQuerySt
} else {
return buildInvalidOperationMsg(pMsgBuf, "not support filter expression");
}
} else {
*pExpr = (tExprNode*)calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TEXPR_BINARYEXPR_NODE;
......@@ -3153,26 +3252,6 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQuerySt
(*pExpr)->_node.optr = convertRelationalOperator(&t);
assert((*pExpr)->_node.optr != 0);
// NOTE: binary|nchar data allows the >|< type filter
if ((*pExpr)->_node.optr != TSDB_RELATION_EQUAL && (*pExpr)->_node.optr != TSDB_RELATION_NOT_EQUAL) {
if (pRight != NULL && pRight->nodeType == TEXPR_VALUE_NODE) {
if (pRight->pVal->nType == TSDB_DATA_TYPE_BOOL && pLeft->pSchema->type == TSDB_DATA_TYPE_BOOL) {
return buildInvalidOperationMsg(pMsgBuf, "invalid operator for bool");
}
}
}
// scalar op aggregate check
if (pLeft->nodeType == TEXPR_FUNCTION_NODE && pRight->nodeType != TEXPR_FUNCTION_NODE) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression");
}
if (pLeft->nodeType == TEXPR_FUNCTION_NODE && pRight->nodeType == TEXPR_FUNCTION_NODE) {
if (qIsAggregateFunction(pLeft->_function.functionName) != qIsAggregateFunction(pRight->_function.functionName)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression");
}
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -3193,7 +3272,7 @@ static int32_t multiColumnListInsert(SQueryStmtInfo* pQueryInfo, SArray* pColumn
columnListInsert(pQueryInfo->colList, p->uid, &s, p->flag);
}
insertPrimaryTsColumn(pQueryInfo->colList, p1->uid);
insertPrimaryTsColumn(pQueryInfo->colList, NULL, p1->uid);
return TSDB_CODE_SUCCESS;
}
......@@ -3201,9 +3280,14 @@ static int32_t addScalarExprAndResColumn(SQueryStmtInfo* pQueryInfo, int32_t exp
SArray* pColumnList = taosArrayInit(4, sizeof(SColumn));
SSchema s = createSchema(TSDB_DATA_TYPE_DOUBLE, sizeof(double), getNewResColId(), "");
int32_t ret = validateSqlExpr(pItem->pNode, pQueryInfo, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
tExprNode* pNode = NULL;
bool keepTableCols = true;
int32_t ret = sqlExprToExprNode(&pNode, pItem->pNode, pQueryInfo, pColumnList, &keepTableCols, pMsgBuf);
ret = sqlExprToExprNode(&pNode, pItem->pNode, pQueryInfo, pColumnList, &keepTableCols, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(pNode, NULL);
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
......
......@@ -815,13 +815,15 @@ SColumn* columnListInsert(SArray* pColumnList, uint64_t uid, SSchema* pSchema, i
b->info.bytes = pSchema->bytes;
b->info.type = pSchema->type;
tstrncpy(b->name, pSchema->name, tListLen(b->name));
taosArrayInsert(pColumnList, i, &b);
return b;
}
SColumn* insertPrimaryTsColumn(SArray* pColumnList, uint64_t tableUid) {
SColumn* insertPrimaryTsColumn(SArray* pColumnList, const char* colName, uint64_t tableUid) {
SSchema s = {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = TSDB_KEYSIZE, .colId = PRIMARYKEY_TIMESTAMP_COL_ID};
strncpy(s.name, colName, tListLen(s.name));
return columnListInsert(pColumnList, tableUid, &s, TSDB_COL_NORMAL);
}
......
......@@ -96,12 +96,13 @@ void generateLogicplan(const char* sql) {
char* str = NULL;
qQueryPlanToString(n, &str);
printf("--------SQL:%s\n", sql);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
}
......@@ -164,10 +165,17 @@ TEST(testCase, planner_test) {
}
TEST(testCase, displayPlan) {
generateLogicplan("select count(*) from `t.1abc`");
generateLogicplan("select count(*) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b) from `t.1abc`");
// generateLogicplan("select count(*) from `t.1abc`");
// generateLogicplan("select count(*)+ 22 from `t.1abc`");
// generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h)");
// generateLogicplan("select count(*) from `t.1abc` group by a");
// generateLogicplan("select count(A+B) from `t.1abc` group by a");
// generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
// generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
// generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
// generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
// generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc`");
// order by + group by column + limit offset + fill
......@@ -178,4 +186,9 @@ TEST(testCase, displayPlan) {
// union
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
}
\ No newline at end of file
......@@ -46,12 +46,14 @@ typedef struct SJoinCond {
static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo);
int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
exprInfoPushDown((struct SQueryStmtInfo*) pQueryInfo);
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
assert(taosArrayGetSize(upstream) == 1);
......@@ -134,12 +136,11 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
case QNODE_GROUPBY: {
SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo;
SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
pGroupbyExpr->tableIndex = p->tableIndex;
pGroupbyExpr->orderType = p->orderType;
pGroupbyExpr->orderIndex = p->orderIndex;
SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
pGroupbyExpr->groupbyTag = p->groupbyTag;
pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo);
pNode->pExtInfo = pGroupbyExpr;
break;
}
......@@ -155,7 +156,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
break;
}
default:
assert(0);
break;
}
return pNode;
......@@ -188,12 +189,11 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(tableCols, i);
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo1->pTableMeta, i);
SSourceParam param = {0};
addIntoSourceParam(&param, NULL, pCol);
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", &param, pSchema, 0);
SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name);
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", &param, &s, 0);
pExpr[i] = p;
}
......@@ -234,36 +234,47 @@ static SQueryPlanNode* createOneQueryPlanNode(SArray* p, SQueryPlanNode* pNode,
}
}
static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info, SArray** pExprs) {
// check for aggregation
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
// group by column not by tag
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
// check for aggregation
int32_t level = getFunctionLevel(pQueryInfo);
for(int32_t i = level - 1; i >= 0; --i) {
SArray* p = pQueryInfo->exprList[i];
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
if (i == 0) {
size_t num = taosArrayGetSize(p);
bool aggregateFunc = false;
for(int32_t j = 0; j < num; ++j) {
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) {
continue;
}
aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName);
if (aggregateFunc) {
break;
}
}
if (aggregateFunc) {
if (pQueryInfo->interval.interval > 0) {
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, numOfOutput, info, &pQueryInfo->interval);
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval);
} else if (pQueryInfo->sessionWindow.gap > 0) {
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL);
} else if (pQueryInfo->stateWindow.columnId > 0) {
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, NULL, 0, info, NULL);
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr);
} else {
pNode = createOneQueryPlanNode(p, pNode, pExpr, info);
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, info, NULL);
}
} else {
pNode = createOneQueryPlanNode(p, pNode, pExpr, info);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, info, NULL);
}
}
// group by column not by tag
if (numOfGroupCols != 0) {
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, NULL, 0, info, &pQueryInfo->groupbyExpr);
}
if (pQueryInfo->havingFieldNum > 0) {
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
......@@ -285,7 +296,7 @@ static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryIn
return pNode;
}
static SQueryPlanNode* doCreateQueryPlanForOneTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
SArray* tableCols) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
......@@ -299,11 +310,62 @@ static SQueryPlanNode* doCreateQueryPlanForOneTable(SQueryStmtInfo* pQueryInfo,
return pNode;
}
SQueryPlanNode* pNode1 = doCreateQueryPlanForOneTableImpl(pQueryInfo, pNode, &info, pExprs);
SQueryPlanNode* pNode1 = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
tfree(info.tableName);
return pNode1;
}
static bool isAllAggExpr(SArray* pList) {
assert(pList != NULL);
for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) {
SExprInfo* p = taosArrayGetP(pList, k);
if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) {
return false;
}
}
return true;
}
static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) {
assert(pQueryInfo != NULL);
size_t level = getFunctionLevel(pQueryInfo);
for(int32_t i = 0; i < level - 1; ++i) {
SArray* p = pQueryInfo->exprList[i];
SArray* pNext = pQueryInfo->exprList[i + 1];
if (!isAllAggExpr(pNext)) {
continue;
}
for (int32_t j = 0; j < taosArrayGetSize(p); ++j) {
SExprInfo* pExpr = taosArrayGetP(p, j);
bool canPushDown = true;
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE && qIsAggregateFunction(pExpr->pExpr->_function.functionName)) {
for (int32_t k = 0; k < taosArrayGetSize(pNext); ++k) {
SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k);
if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) {
// pExpr is dependent on the output of the under layer, so it can not be push downwards
canPushDown = false;
break;
}
}
}
if (canPushDown) {
taosArrayInsert(pNext, j, &pExpr);
taosArrayRemove(p, j);
// add the project function in level of "i"
}
}
}
}
SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SArray* upstream = NULL;
......@@ -357,12 +419,12 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
pQueryInfo->exprList[0]->pData, num, &info, NULL);
// 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForOneTableImpl(pQueryInfo, pNode, &info, pQueryInfo->exprList);
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
upstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode);
} else { // only one table, normal query process
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
SQueryPlanNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
upstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode);
}
......@@ -403,14 +465,30 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
switch(pQueryNode->info.type) {
case QNODE_TABLESCAN: {
STimeWindow* win = (STimeWindow*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64 "\n",
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64 " cols: ",
pQueryNode->tableInfo.tableName, pQueryNode->tableInfo.uid, win->skey, win->ekey);
assert(len1 > 0);
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i);
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId);
assert(len1 > 0);
len += len1;
}
len1 = sprintf(buf + len, "\n");
assert(len1 > 0);
len += len1;
break;
}
case QNODE_PROJECT: {
len1 = sprintf(buf + len, "cols: ");
assert(len1 > 0);
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
......@@ -418,6 +496,8 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
SSqlExpr* p = &pExprInfo->base;
len1 = sprintf(buf + len, "[%s #%d]", p->resSchema.name, p->resSchema.colId);
assert(len1 > 0);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
......@@ -493,11 +573,16 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
}
SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo;
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, 0);
len1 = sprintf(buf + len,") groupby_col: [%s #%d]\n", pIndex->name, pIndex->colId);
len1 = sprintf(buf + len,") groupby_col: ");
len += len1;
for(int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i);
len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId);
len += len1;
}
len += sprintf(buf + len, "\n");
break;
}
......
......@@ -189,7 +189,7 @@ static SQueryNode* doCreateQueryPlanForOneTableImpl(SQueryInfo* pQueryInfo, SQue
return pNode;
}
static SQueryNode* doCreateQueryPlanForOneTable(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
static SQueryNode* doCreateQueryPlanForSingleTable(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
SArray* tableCols) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -266,7 +266,7 @@ SArray* createQueryPlanImpl(SQueryInfo* pQueryInfo) {
taosArrayPush(upstream, &pNode);
} else { // only one table, normal query process
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList, pQueryInfo->colList);
SQueryNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList, pQueryInfo->colList);
upstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册