未验证 提交 969d0ae6 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #10843 from taosdata/feature/3.0_wxy

stream plan implement
...@@ -36,6 +36,7 @@ typedef enum EScanType { ...@@ -36,6 +36,7 @@ typedef enum EScanType {
SCAN_TYPE_TAG, SCAN_TYPE_TAG,
SCAN_TYPE_TABLE, SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE, SCAN_TYPE_STABLE,
SCAN_TYPE_TOPIC,
SCAN_TYPE_STREAM SCAN_TYPE_STREAM
} EScanType; } EScanType;
......
...@@ -23,7 +23,8 @@ extern "C" { ...@@ -23,7 +23,8 @@ extern "C" {
#include "nodes.h" #include "nodes.h"
#include "tmsg.h" #include "tmsg.h"
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema))) #define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags)
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema)))
#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo))) #define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo)))
typedef struct SRawExprNode { typedef struct SRawExprNode {
......
...@@ -26,7 +26,7 @@ typedef struct SParseContext { ...@@ -26,7 +26,7 @@ typedef struct SParseContext {
uint64_t requestId; uint64_t requestId;
int32_t acctId; int32_t acctId;
const char *db; const char *db;
bool streamQuery; bool topicQuery;
void *pTransporter; void *pTransporter;
SEpSet mgmtEpSet; SEpSet mgmtEpSet;
const char *pSql; // sql string const char *pSql; // sql string
......
...@@ -26,6 +26,7 @@ typedef struct SPlanContext { ...@@ -26,6 +26,7 @@ typedef struct SPlanContext {
uint64_t queryId; uint64_t queryId;
int32_t acctId; int32_t acctId;
SNode* pAstRoot; SNode* pAstRoot;
bool topicQuery;
bool streamQuery; bool streamQuery;
} SPlanContext; } SPlanContext;
......
...@@ -22,6 +22,14 @@ ...@@ -22,6 +22,14 @@
extern "C" { extern "C" {
#endif #endif
#define tjsonGetNumberValue(pJson, pName, val) \
({ \
uint64_t _tmp = 0; \
int32_t _code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \
val = _tmp; \
_code; \
})
typedef void SJson; typedef void SJson;
SJson* tjsonCreateObject(); SJson* tjsonCreateObject();
......
...@@ -230,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t ...@@ -230,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
// --- heartbeat // --- heartbeat
......
...@@ -137,14 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* ...@@ -137,14 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) { int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = { SParseContext cxt = {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId, .acctId = pTscObj->acctId,
.db = pRequest->pDb, .db = pRequest->pDb,
.streamQuery = streamQuery, .topicQuery = topicQuery,
.pSql = pRequest->sqlstr, .pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen, .sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
......
...@@ -246,7 +246,7 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { ...@@ -246,7 +246,7 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) {
SQueryPlan* pPlan = NULL; SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = { .pAstRoot = pAst, .streamQuery = true }; SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true };
code = qCreateQueryPlan(&cxt, &pPlan, NULL); code = qCreateQueryPlan(&cxt, &pPlan, NULL);
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cmdnodes.h"
#include "nodesUtil.h" #include "nodesUtil.h"
#include "plannodes.h" #include "plannodes.h"
#include "querynodes.h" #include "querynodes.h"
...@@ -85,6 +86,8 @@ const char* nodesNodeName(ENodeType type) { ...@@ -85,6 +86,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowDatabaseStmt"; return "ShowDatabaseStmt";
case QUERY_NODE_SHOW_TABLES_STMT: case QUERY_NODE_SHOW_TABLES_STMT:
return "ShowTablesStmt"; return "ShowTablesStmt";
case QUERY_NODE_CREATE_TOPIC_STMT:
return "CreateTopicStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan"; return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
...@@ -179,16 +182,118 @@ static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList** ...@@ -179,16 +182,118 @@ static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList**
return jsonToNodeListImpl(tjsonGetObjectItem(pJson, pName), pList); return jsonToNodeListImpl(tjsonGetObjectItem(pJson, pName), pList);
} }
static const char* jkTableMetaUid = "TableMetaUid"; static const char* jkTableComInfoNumOfTags = "NumOfTags";
static const char* jkTableMetaSuid = "TableMetaSuid"; static const char* jkTableComInfoPrecision = "Precision";
static const char* jkTableComInfoNumOfColumns = "NumOfColumns";
static const char* jkTableComInfoRowSize = "RowSize";
static int32_t tableComInfoToJson(const void* pObj, SJson* pJson) {
const STableComInfo* pNode = (const STableComInfo*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoPrecision, pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoRowSize, pNode->rowSize);
}
return code;
}
static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) {
STableComInfo* pNode = (STableComInfo*)pObj;
int32_t code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize);
}
return code;
}
static const char* jkSchemaType = "Type";
static const char* jkSchemaColId = "ColId";
static const char* jkSchemaBytes = "bytes";
static const char* jkSchemaName = "Name";
static int32_t schemaToJson(const void* pObj, SJson* pJson) {
const SSchema* pNode = (const SSchema*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkSchemaType, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSchemaColId, pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSchemaBytes, pNode->bytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSchemaName, pNode->name);
}
return code;
}
static int32_t jsonToSchema(const SJson* pJson, void* pObj) {
SSchema* pNode = (SSchema*)pObj;
int32_t code = tjsonGetNumberValue(pJson, jkSchemaType, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name);
}
return code;
}
static const char* jkTableMetaVgId = "VgId";
static const char* jkTableMetaTableType = "TableType";
static const char* jkTableMetaUid = "Uid";
static const char* jkTableMetaSuid = "Suid";
static const char* jkTableMetaSversion = "Sversion";
static const char* jkTableMetaTversion = "Tversion";
static const char* jkTableMetaComInfo = "ComInfo";
static const char* jkTableMetaColSchemas = "ColSchemas";
static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
const STableMeta* pNode = (const STableMeta*)pObj; const STableMeta* pNode = (const STableMeta*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid); int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaVgId, pNode->vgId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid); code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSversion, pNode->sversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaTversion, pNode->tversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTableMetaComInfo, tableComInfoToJson, &pNode->tableInfo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddArray(pJson, jkTableMetaColSchemas, schemaToJson, pNode->schema, sizeof(SSchema), TABLE_TOTAL_COL_NUM(pNode));
}
return code; return code;
} }
...@@ -196,9 +301,27 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { ...@@ -196,9 +301,27 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
STableMeta* pNode = (STableMeta*)pObj; STableMeta* pNode = (STableMeta*)pObj;
int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid); int32_t code = tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid); code = tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToArray(pJson, jkTableMetaColSchemas, jsonToSchema, pNode->schema, sizeof(SSchema));
} }
return code; return code;
...@@ -222,7 +345,22 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -222,7 +345,22 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
SLogicNode* pNode = (SLogicNode*)pObj;
int32_t code = jsonToNodeList(pJson, jkLogicPlanTargets, &pNode->pTargets);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkLogicPlanConditions, &pNode->pConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkLogicPlanChildren, &pNode->pChildren);
}
return code;
}
static const char* jkScanLogicPlanScanCols = "ScanCols"; static const char* jkScanLogicPlanScanCols = "ScanCols";
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
static const char* jkScanLogicPlanTableMeta = "TableMeta"; static const char* jkScanLogicPlanTableMeta = "TableMeta";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
...@@ -232,6 +370,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -232,6 +370,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols); code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta); code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
} }
...@@ -239,6 +380,24 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -239,6 +380,24 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
SScanLogicNode* pNode = (SScanLogicNode*)pObj;
int32_t objSize = 0;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
}
return code;
}
static const char* jkProjectLogicPlanProjections = "Projections"; static const char* jkProjectLogicPlanProjections = "Projections";
static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
...@@ -252,6 +411,17 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { ...@@ -252,6 +411,17 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
SProjectLogicNode* pNode = (SProjectLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkProjectLogicPlanProjections, &pNode->pProjections);
}
return code;
}
static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions"; static const char* jkJoinLogicPlanOnConditions = "OnConditions";
...@@ -1739,6 +1909,45 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { ...@@ -1739,6 +1909,45 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkCreateTopicStmtTopicName = "TopicName";
static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName";
static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists";
static const char* jkCreateTopicStmtQuery = "Query";
static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) {
const SCreateTopicStmt* pNode = (const SCreateTopicStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateTopicStmtQuery, nodeToJson, pNode->pQuery);
}
return code;
}
static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
SCreateTopicStmt* pNode = (SCreateTopicStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateTopicStmtQuery, &pNode->pQuery);
}
return code;
}
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
switch (nodeType(pObj)) { switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN: case QUERY_NODE_COLUMN:
...@@ -1790,6 +1999,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1790,6 +1999,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT: case QUERY_NODE_SHOW_TABLES_STMT:
break; break;
case QUERY_NODE_CREATE_TOPIC_STMT:
return createTopicStmtToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanNodeToJson(pObj, pJson); return logicScanNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
...@@ -1877,14 +2088,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -1877,14 +2088,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// break; // break;
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
return jsonToSelectStmt(pJson, pObj); return jsonToSelectStmt(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_CREATE_TOPIC_STMT:
// return jsonToLogicScanNode(pJson, pObj); return jsonToCreateTopicStmt(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return jsonToLogicScanNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_JOIN: // case QUERY_NODE_LOGIC_PLAN_JOIN:
// return jsonToLogicJoinNode(pJson, pObj); // return jsonToLogicJoinNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_AGG: // case QUERY_NODE_LOGIC_PLAN_AGG:
// return jsonToLogicAggNode(pJson, pObj); // return jsonToLogicAggNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_PROJECT: case QUERY_NODE_LOGIC_PLAN_PROJECT:
// return jsonToLogicProjectNode(pJson, pObj); return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj); return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
......
...@@ -499,7 +499,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) ...@@ -499,7 +499,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
} }
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) { static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
if (pCxt->streamQuery) { if (pCxt->topicQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1393,7 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p ...@@ -1393,7 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
SCMCreateTopicReq createReq = {0}; SCMCreateTopicReq createReq = {0};
if (NULL != pStmt->pQuery) { if (NULL != pStmt->pQuery) {
pCxt->pParseCxt->streamQuery = true; pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery); int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
......
...@@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*); TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE; pScan->scanType = pCxt->pPlanCxt->topicQuery ? SCAN_TYPE_TOPIC : SCAN_TYPE_TABLE;
pScan->scanFlag = MAIN_SCAN; pScan->scanFlag = MAIN_SCAN;
pScan->scanRange = TSWINDOW_INITIALIZER; pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T; pScan->tableName.type = TSDB_TABLE_NAME_T;
......
...@@ -272,6 +272,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl ...@@ -272,6 +272,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl
return createTagScanPhysiNode(pCxt, pScanLogicNode); return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE: case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
case SCAN_TYPE_TOPIC:
case SCAN_TYPE_STREAM: case SCAN_TYPE_STREAM:
return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode); return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
default: default:
...@@ -472,11 +473,20 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC ...@@ -472,11 +473,20 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
} }
static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) { static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) {
if (pCxt->pPlanCxt->streamQuery) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
CHECK_ALLOC(pScan, NULL);
pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
CHECK_ALLOC(pScan->pScanCols, (SPhysiNode*)pScan);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc), (SPhysiNode*)pScan);
return (SPhysiNode*)pScan;
} else {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
CHECK_ALLOC(pExchange, NULL); CHECK_ALLOC(pExchange, NULL);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange); CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
return (SPhysiNode*)pExchange; return (SPhysiNode*)pExchange;
}
} }
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) { static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
...@@ -614,7 +624,9 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog ...@@ -614,7 +624,9 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else { } else {
pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode); pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode);
if (!pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
}
pSubplan->msgType = TDMT_VND_QUERY; pSubplan->msgType = TDMT_VND_QUERY;
} }
return pSubplan; return pSubplan;
......
...@@ -44,7 +44,8 @@ typedef struct SStsInfo { ...@@ -44,7 +44,8 @@ typedef struct SStsInfo {
} SStsInfo; } SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) { static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType &&
SCAN_TYPE_TOPIC != ((SScanLogicNode*)pNode)->scanType) {
return pNode; return pNode;
} }
SNode* pChild; SNode* pChild;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "cmdnodes.h"
#include "parser.h" #include "parser.h"
#include "planInt.h" #include "planInt.h"
...@@ -56,7 +57,8 @@ protected: ...@@ -56,7 +57,8 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false); const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr; SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; SPlanContext cxt = { .queryId = 1, .acctId = 0 };
setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicPlan); code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
...@@ -94,6 +96,15 @@ protected: ...@@ -94,6 +96,15 @@ protected:
private: private:
static const int max_err_len = 1024; static const int max_err_len = 1024;
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
pCxt->topicQuery = true;
} else {
pCxt->pAstRoot = pQuery->pRoot;
}
}
void reset() { void reset() {
memset(&cxt_, 0, sizeof(cxt_)); memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len); memset(errMagBuf_, 0, max_err_len);
...@@ -173,3 +184,10 @@ TEST_F(PlannerTest, interval) { ...@@ -173,3 +184,10 @@ TEST_F(PlannerTest, interval) {
bind("SELECT count(*) FROM t1 interval(10s)"); bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(PlannerTest, createTopic) {
setDatabase("root", "test");
bind("create topic tp as SELECT * FROM st1");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册