提交 8ba07803 编写于 作者: H Haojun Liao

[td-10564] Fix bug in parse nest sql query.

上级 4c6dad1f
......@@ -58,6 +58,7 @@ typedef struct SDataBlockInfo {
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pTagsList; // SArray<SVariant> for tag value
SDataBlockInfo info;
} SSDataBlock;
......
......@@ -23,6 +23,35 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_StateWindow = 22,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
// OP_DummyInput = 16, //TODO remove it after fully refactor.
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
OP_Exchange = 26,
};
struct SEpSet;
struct SQueryPlanNode;
struct SQueryDistPlanNode;
......
......@@ -279,34 +279,6 @@ enum {
OP_EXEC_DONE = 3,
};
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
......
......@@ -182,25 +182,14 @@ bool isArithmeticQueryOnAggResult(SArray* pFunctionIdList) {
return false;
}
bool isGroupbyColumn(SArray* pFunctionIdList) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// int32_t numOfCols = getNumOfColumns(pTableMetaInfo->pTableMeta);
//
// SGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
// for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
// SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
// if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns
// return true;
// }
// }
return false;
bool isGroupbyColumn(SGroupbyExpr* pGroupby) {
return !pGroupby->groupbyTag;
}
bool isTopBotQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS) {
continue;
}
......@@ -432,7 +421,6 @@ bool hasTagValOutput(SArray* pFunctionIdList) {
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
assert(pFunctionIdList != NULL);
pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList);
if (pDesc->blockDistribution) {
return;
......@@ -441,4 +429,5 @@ void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
pDesc->interpQuery = isInterpQuery(pFunctionIdList);
pDesc->topbotQuery = isTopBotQuery(pFunctionIdList);
}
......@@ -294,7 +294,10 @@ SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSe
SAlterTableInfo * tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableType);
SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames, SArray *pTagVals, SToken *pToken,
SToken *igExists);
/*!
* test
* @param pSqlNode
*/
void destroyAllSqlNode(struct SSubclause *pSqlNode);
void destroySqlNode(SSqlNode *pSql);
void freeCreateTableInfo(void* p);
......
......@@ -1607,6 +1607,30 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
// select top(col, k), count(*) from table_name
int32_t num = 0;
SExprInfo* pMain = NULL;
size_t size = getNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
const char* functionName = pExpr->pExpr->_function.functionName;
if (strcmp(functionName, "top") != 0 && strcmp(functionName, "bottom") != 0) {
if (qIsAggregateFunction(functionName)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
}
// the primary key is valid
if (pExpr->pExpr->nodeType == TEXPR_COL_NODE) {
if (pExpr->pExpr->pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
continue;
}
}
continue;
}
}
}
/*
......@@ -2628,35 +2652,51 @@ static int32_t validateScalarFunctionParamNum(tSqlExpr* pSqlExpr, int32_t functi
return code;
}
SExprInfo* doAddProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SColumnIndex* pColIndex, const char* aliasName, int32_t colId) {
int32_t doAddProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SColumnIndex* pColIndex,
const char* aliasName, int32_t colId, SMsgBuf* pMsgBuf) {
STableMeta* pTableMeta = getMetaInfo(pQueryInfo, pColIndex->tableIndex)->pTableMeta;
SSchema* pSchema = getOneColumnSchema(pTableMeta, pColIndex->columnIndex);
SColumnIndex index = *pColIndex;
char* funcName = NULL;
if (TSDB_COL_IS_TAG(index.type)) {
int32_t numOfCols = getNumOfColumns(pTableMeta);
index.columnIndex = pColIndex->columnIndex - numOfCols;
funcName = "project_tag";
} else {
index.columnIndex = pColIndex->columnIndex;
funcName = "project_col";
}
const char* name = (aliasName == NULL)? pSchema->name:aliasName;
SSchema s = createSchema(pSchema->type, pSchema->bytes, colId, name);
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex);
SColumn c = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema);
tExprNode *pNode = NULL;
bool keepTableCols = true;
SArray* pColumnList = taosArrayInit(4, sizeof(SColumn));
SSourceParam param = {0};
addIntoSourceParam(&param, NULL, &c);
tSqlExpr sqlNode = {0};
sqlNode.type = SQL_NODE_TABLE_COLUMN;
SToken colNameToken = {.z = pSchema->name, .n = strlen(pSchema->name)};
sqlNode.columnName = colNameToken;
int32_t ret = sqlExprToExprNode(&pNode, &sqlNode, pQueryInfo, pColumnList, &keepTableCols, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(pNode, NULL);
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
}
SExprInfo* pExpr = createBinaryExprInfo(pNode, &s);
tstrncpy(pExpr->base.resSchema.name, name, tListLen(pExpr->base.resSchema.name));
tstrncpy(pExpr->base.token, name, tListLen(pExpr->base.token));
SArray* pExprList = getCurrentExprList(pQueryInfo);
addExprInfo(pExprList, outputColIndex, pExpr, pQueryInfo->exprListLevelIndex);
return doAddOneExprInfo(pQueryInfo, funcName, &param, outputColIndex, pTableMetaInfo, &s, 0, s.name, true);
// extract columns according to the tExprNode tree
size_t num = taosArrayGetSize(pColumnList);
pExpr->base.pColumns = calloc(num, sizeof(SColumn));
for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGet(pColumnList, i);
pExpr->base.pColumns[i] = *pCol;
}
pExpr->base.numOfCols = num;
return TSDB_CODE_SUCCESS;
}
static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos) {
static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos, SMsgBuf* pMsgBuf) {
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -2669,7 +2709,7 @@ static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColu
for (int32_t j = 0; j < numOfTotalColumns; ++j) {
pIndex->columnIndex = j;
doAddProjectCol(pQueryInfo, startPos + j, pIndex, NULL, getNewResColId());
doAddProjectCol(pQueryInfo, startPos + j, pIndex, NULL, getNewResColId(), pMsgBuf);
}
return numOfTotalColumns;
......@@ -2778,11 +2818,11 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
if (index.tableIndex == COLUMN_INDEX_INITIAL_VAL) { // all table columns are required.
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
index.tableIndex = i;
int32_t inc = doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos);
int32_t inc = doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos, pMsgBuf);
startPos += inc;
}
} else {
doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos);
doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos, pMsgBuf);
}
// add the primary timestamp column even though it is not required by user
......@@ -2821,7 +2861,7 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
doAddProjectCol(pQueryInfo, startPos, &index, pItem->aliasName, getNewResColId());
doAddProjectCol(pQueryInfo, startPos, &index, pItem->aliasName, getNewResColId(), pMsgBuf);
}
// add the primary timestamp column even though it is not required by user
......@@ -2920,12 +2960,12 @@ static tExprNode* doCreateColumnNode(SQueryStmtInfo* pQueryInfo, SColumnIndex* p
static int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SMsgBuf* pMsgBuf);
static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode** p, SArray* pCols,
static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode*** p, SArray* pCols,
bool* keepTableCols, const tSqlExpr* pSqlExpr, SMsgBuf* pMsgBuf) {
SArray* pParamList = pSqlExpr->Expr.paramList;
if (pParamList != NULL) {
*num = taosArrayGetSize(pParamList);
p = calloc((*num), POINTER_BYTES);
(*p) = calloc((*num), POINTER_BYTES);
for (int32_t i = 0; i < (*num); ++i) {
tSqlExprItem* pItem = taosArrayGet(pParamList, i);
......@@ -2935,7 +2975,7 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_
return ret;
}
int32_t code = sqlExprToExprNode(&p[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
int32_t code = sqlExprToExprNode(&(*p)[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2946,10 +2986,10 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_
}
*num = 1;
p = calloc(*num, POINTER_BYTES);
(*p) = calloc(*num, POINTER_BYTES);
SColumnIndex index = {.type = TSDB_COL_NORMAL, .tableIndex = 0, .columnIndex = 0};
p[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
(*p)[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
}
return TSDB_CODE_SUCCESS;
......@@ -3037,7 +3077,7 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM
// do check the parameter number for scalar function
if (scalar) {
int32_t ret = validateScalarFunctionParamNum(pSqlExpr, functionId, pMsgBuf);
int32_t ret = validateScalarFunctionParamNum((tSqlExpr*) pSqlExpr, functionId, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, "invalid number of function parameters");
}
......@@ -3087,7 +3127,7 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm
int32_t num = 0;
tExprNode** p = NULL;
int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, p, pCols, keepTableCols, pSqlExpr, pMsgBuf);
int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, &p, pCols, keepTableCols, pSqlExpr, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3147,6 +3187,9 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm
(*pExpr)->pSchema = calloc(1, sizeof(SSchema));
strncpy((*pExpr)->pSchema->name, pSqlExpr->exprToken.z, pSqlExpr->exprToken.n);
// it must be the aggregate function
assert(qIsAggregateFunction((*pExpr)->pSchema->name));
uint64_t uid = findTmpSourceColumnInNextLevel(pQueryInfo, *pExpr);
if (!(*keepTableCols)) {
SColumn c = createColumn(uid, NULL, TSDB_COL_TMP, (*pExpr)->pSchema);
......@@ -3345,7 +3388,8 @@ int32_t validateSelectNodeList(SQueryStmtInfo* pQueryInfo, SArray* pSelNodeList,
}
} else if (type == SQL_NODE_TABLE_COLUMN || type == SQL_NODE_VALUE) {
// use the dynamic array list to decide if the function is valid or not
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
// todo refacto to remove this function
if ((code = addProjectionExprAndResColumn(pQueryInfo, pItem, outerQuery, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -324,10 +324,14 @@ SArray* extractFunctionList(SArray* pExprInfoList) {
assert(pExprInfoList != NULL);
size_t len = taosArrayGetSize(pExprInfoList);
SArray* p = taosArrayInit(len, sizeof(int32_t));
SArray* p = taosArrayInit(len, POINTER_BYTES);
for(int32_t i = 0; i < len; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pExprInfoList, i);
taosArrayPush(p, &pExprInfo->pExpr->_function.functionName);
if (pExprInfo->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
taosArrayPush(p, &pExprInfo->pExpr->_function.functionName);
} else {
taosArrayPush(p, "");
}
}
return p;
......
......@@ -398,6 +398,7 @@ TEST(testCase, function_Test5) {
TEST(testCase, function_Test10) {
sqlCheck("select c from `t.1abc`", true);
sqlCheck("select length(c) from `t.1abc`", true);
sqlCheck("select length(sum(col)) from `t.1abc`", true);
sqlCheck("select sum(length(a+b)) from `t.1abc`", true);
sqlCheck("select sum(sum(a+b)) from `t.1abc`", false);
sqlCheck("select sum(length(a) + length(b)) from `t.1abc`", true);
......@@ -406,6 +407,8 @@ TEST(testCase, function_Test10) {
sqlCheck("select cov(a, b) from `t.1abc`", true);
sqlCheck("select sum(length(a) + count(b)) from `t.1abc`", false);
sqlCheck("select concat(sum(a), count(b)) from `t.1abc`", true);
sqlCheck("select concat(concat(a,b), concat(a,b)) from `t.1abc`", true);
sqlCheck("select length(length(length(a))) from `t.1abc`", true);
sqlCheck("select count() from `t.1abc`", false);
......@@ -415,13 +418,16 @@ TEST(testCase, function_Test10) {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sqlCheck("select length119(a,b) from `t.1abc`", false);
sqlCheck("select length(a,b) from `t.1abc`", false);
sqlCheck("select block_dist() + 20 from `t.1abc`", false);
sqlCheck("select top(a, 20), count(b) from `t.1abc`", false);
sqlCheck("select length(a, b) from `t.1abc`", false);
sqlCheck("select block_dist() + 20 from `t.1abc`", true);
sqlCheck("select count(b), c from `t.1abc`", false);
sqlCheck("select last_row(*), count(b) from `t.1abc`", false);
sqlCheck("select last_row(a, b) + 20 from `t.1abc`", false);
sqlCheck("select last_row(count(*)) from `t.1abc`", false);
sqlCheck("select top(a, 20), count(b) from `t.1abc`", false);
// sqlCheck("select top(a, 20), b from `t.1abc`", false);
// sqlCheck("select top(a, 20), a+20 from `t.1abc`", true);
// sqlCheck("select top(a, 20), bottom(a, 10) from `t.1abc`", false);
// sqlCheck("select last_row(*), count(b) from `t.1abc`", false);
// sqlCheck("select last_row(a, b) + 20 from `t.1abc`", false);
// sqlCheck("select last_row(count(*)) from `t.1abc`", false);
}
TEST(testCase, function_Test6) {
......
......@@ -26,18 +26,26 @@ extern "C" {
#include "taosmsg.h"
typedef struct SQueryNodeBasicInfo {
int32_t type;
char *name;
int32_t type; // operator type
char *name; // operator name
} SQueryNodeBasicInfo;
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
int32_t phase; // merge|partial
int32_t type; // operator type
char *name; // operator name
SEpSet *sourceEp; // data source epset
} SQueryDistPlanNodeInfo;
typedef struct SQueryTableInfo {
char *tableName;
uint64_t uid;
char *tableName;
uint64_t uid;
STimeWindow window;
} SQueryTableInfo;
typedef struct SQueryPlanNode {
SQueryNodeBasicInfo info;
SQueryTableInfo tableInfo;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
......@@ -51,9 +59,49 @@ typedef struct SQueryPlanNode {
typedef struct SQueryDistPlanNode {
SQueryNodeBasicInfo info;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources.
} SQueryDistPlanNode;
typedef struct SQueryCostSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
int64_t cputime; // total cpu cost, not execute elapsed time
int64_t loadRemoteDataDuration; // remote io time
int64_t loadNativeDataDuration; // native disk io time
uint64_t loadNativeData; // blocks + SMA + header files
uint64_t loadRemoteData; // remote data acquired by exchange operator.
uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it
int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue.
uint64_t totalRows;
uint64_t loadRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockAgg;
uint32_t skipBlocks;
uint64_t resultSize; // generated result size in Kb.
} SQueryCostSummary;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
SQueryDistPlanNode *pNode; // operator tree
uint64_t status; // task status
SQueryCostSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
#ifdef __cplusplus
}
#endif
......
......@@ -95,17 +95,12 @@ int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQuery
//======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, void* pExtInfo) {
SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) {
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
pNode->info.type = type;
pNode->info.name = strdup(name);
if (pTableInfo->uid != 0 && pTableInfo->tableName) { // it is a true table
pNode->tableInfo.uid = pTableInfo->uid;
pNode->tableInfo.tableName = strdup(pTableInfo->tableName);
}
pNode->numOfExpr = numOfOutput;
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
......@@ -120,9 +115,10 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch(type) {
case QNODE_TABLESCAN: {
STimeWindow* window = calloc(1, sizeof(STimeWindow));
memcpy(window, pExtInfo, sizeof(STimeWindow));
pNode->pExtInfo = window;
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName);
pNode->pExtInfo = info;
break;
}
......@@ -179,21 +175,20 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->info.onlyTagQuery) {
int32_t num = (int32_t) taosArrayGetSize(pExprs);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL);
if (pQueryInfo->info.distinct) {
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
}
return pNode;
}
STimeWindow* window = &pQueryInfo->window;
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info, window);
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
if (pQueryInfo->info.projectionQuery) {
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
} else {
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
......@@ -210,7 +205,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
pExpr[i] = p;
}
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
tfree(pExpr);
}
......@@ -243,24 +238,24 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
if (aggregateFunc) {
if (pQueryInfo->interval.interval > 0) {
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval);
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval);
} else if (pQueryInfo->sessionWindow.gap > 0) {
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->sessionWindow);
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow);
} else if (pQueryInfo->stateWindow.col.info.colId > 0) {
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->stateWindow);
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow);
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr);
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr);
} else {
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, info, NULL);
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
}
} else {
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
}
}
if (pQueryInfo->havingFieldNum > 0) {
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL);
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
......@@ -269,11 +264,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t));
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr);
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, info, pInfo);
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, pInfo);
}
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, info, &pQueryInfo->limit);
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit);
}
return pNode;
......@@ -399,7 +394,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SQueryTableInfo info = {0};
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
pQueryInfo->exprList[0]->pData, num, &info, NULL);
pQueryInfo->exprList[0]->pData, num, NULL);
// 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
......@@ -419,8 +414,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name);
tfree(pQueryNode->tableInfo.tableName);
// dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pPrevNodes != NULL) {
......@@ -447,9 +440,9 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
switch(pQueryNode->info.type) {
case QNODE_TABLESCAN: {
STimeWindow* win = (STimeWindow*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64 " cols: ",
pQueryNode->tableInfo.tableName, pQueryNode->tableInfo.uid, win->skey, win->ekey);
SQueryTableInfo* pInfo = (SQueryTableInfo*) pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64,
pInfo->tableName, pInfo->uid, pInfo->window.skey, pInfo->window.ekey);
assert(len1 > 0);
len += len1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册