提交 e4dd627b 编写于 作者: X Xiaoyu Wang

stream plan implement

上级 0dd0f6f0
......@@ -36,6 +36,7 @@ typedef enum EScanType {
SCAN_TYPE_TAG,
SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE,
SCAN_TYPE_TOPIC,
SCAN_TYPE_STREAM
} EScanType;
......@@ -154,7 +155,7 @@ typedef struct SPhysiNode {
} SPhysiNode;
typedef struct SScanPhysiNode {
SPhysiNode node;
SPhysiNode node;
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
......
......@@ -23,7 +23,8 @@ extern "C" {
#include "nodes.h"
#include "tmsg.h"
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema)))
#define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags)
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema)))
#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo)))
typedef struct SRawExprNode {
......
......@@ -26,7 +26,7 @@ typedef struct SParseContext {
uint64_t requestId;
int32_t acctId;
const char *db;
bool streamQuery;
bool topicQuery;
void *pTransporter;
SEpSet mgmtEpSet;
const char *pSql; // sql string
......
......@@ -26,6 +26,7 @@ typedef struct SPlanContext {
uint64_t queryId;
int32_t acctId;
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
} SPlanContext;
......
......@@ -22,6 +22,14 @@
extern "C" {
#endif
#define tjsonGetNumberValue(pJson, pName, val) \
({ \
uint64_t _tmp = 0; \
int32_t _code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \
val = _tmp; \
_code; \
})
typedef void SJson;
SJson* tjsonCreateObject();
......
......@@ -230,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
// --- heartbeat
......
......@@ -137,14 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) {
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.streamQuery = streamQuery,
.topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
......
......@@ -246,7 +246,7 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) {
SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = { .pAstRoot = pAst, .streamQuery = true };
SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true };
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
......@@ -85,6 +86,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowDatabaseStmt";
case QUERY_NODE_SHOW_TABLES_STMT:
return "ShowTablesStmt";
case QUERY_NODE_CREATE_TOPIC_STMT:
return "CreateTopicStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -179,16 +182,118 @@ static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList**
return jsonToNodeListImpl(tjsonGetObjectItem(pJson, pName), pList);
}
static const char* jkTableMetaUid = "TableMetaUid";
static const char* jkTableMetaSuid = "TableMetaSuid";
static const char* jkTableComInfoNumOfTags = "NumOfTags";
static const char* jkTableComInfoPrecision = "Precision";
static const char* jkTableComInfoNumOfColumns = "NumOfColumns";
static const char* jkTableComInfoRowSize = "RowSize";
static int32_t tableComInfoToJson(const void* pObj, SJson* pJson) {
const STableComInfo* pNode = (const STableComInfo*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoPrecision, pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableComInfoRowSize, pNode->rowSize);
}
return code;
}
static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) {
STableComInfo* pNode = (STableComInfo*)pObj;
int32_t code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize);
}
return code;
}
static const char* jkSchemaType = "Type";
static const char* jkSchemaColId = "ColId";
static const char* jkSchemaBytes = "bytes";
static const char* jkSchemaName = "Name";
static int32_t schemaToJson(const void* pObj, SJson* pJson) {
const SSchema* pNode = (const SSchema*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkSchemaType, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSchemaColId, pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSchemaBytes, pNode->bytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSchemaName, pNode->name);
}
return code;
}
static int32_t jsonToSchema(const SJson* pJson, void* pObj) {
SSchema* pNode = (SSchema*)pObj;
int32_t code = tjsonGetNumberValue(pJson, jkSchemaType, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name);
}
return code;
}
static const char* jkTableMetaVgId = "VgId";
static const char* jkTableMetaTableType = "TableType";
static const char* jkTableMetaUid = "Uid";
static const char* jkTableMetaSuid = "Suid";
static const char* jkTableMetaSversion = "Sversion";
static const char* jkTableMetaTversion = "Tversion";
static const char* jkTableMetaComInfo = "ComInfo";
static const char* jkTableMetaColSchemas = "ColSchemas";
static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
const STableMeta* pNode = (const STableMeta*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaVgId, pNode->vgId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSversion, pNode->sversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaTversion, pNode->tversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTableMetaComInfo, tableComInfoToJson, &pNode->tableInfo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddArray(pJson, jkTableMetaColSchemas, schemaToJson, pNode->schema, sizeof(SSchema), TABLE_TOTAL_COL_NUM(pNode));
}
return code;
}
......@@ -196,9 +301,27 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
STableMeta* pNode = (STableMeta*)pObj;
int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid);
int32_t code = tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid);
code = tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToArray(pJson, jkTableMetaColSchemas, jsonToSchema, pNode->schema, sizeof(SSchema));
}
return code;
......@@ -222,7 +345,22 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
SLogicNode* pNode = (SLogicNode*)pObj;
int32_t code = jsonToNodeList(pJson, jkLogicPlanTargets, &pNode->pTargets);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkLogicPlanConditions, &pNode->pConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkLogicPlanChildren, &pNode->pChildren);
}
return code;
}
static const char* jkScanLogicPlanScanCols = "ScanCols";
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
static const char* jkScanLogicPlanTableMeta = "TableMeta";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
......@@ -232,6 +370,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
}
......@@ -239,6 +380,24 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
SScanLogicNode* pNode = (SScanLogicNode*)pObj;
int32_t objSize = 0;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
}
return code;
}
static const char* jkProjectLogicPlanProjections = "Projections";
static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
......@@ -252,6 +411,17 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
SProjectLogicNode* pNode = (SProjectLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkProjectLogicPlanProjections, &pNode->pProjections);
}
return code;
}
static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
......@@ -1739,6 +1909,45 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkCreateTopicStmtTopicName = "TopicName";
static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName";
static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists";
static const char* jkCreateTopicStmtQuery = "Query";
static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) {
const SCreateTopicStmt* pNode = (const SCreateTopicStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateTopicStmtQuery, nodeToJson, pNode->pQuery);
}
return code;
}
static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
SCreateTopicStmt* pNode = (SCreateTopicStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateTopicStmtQuery, &pNode->pQuery);
}
return code;
}
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:
......@@ -1790,6 +1999,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
break;
case QUERY_NODE_CREATE_TOPIC_STMT:
return createTopicStmtToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -1877,14 +2088,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// break;
case QUERY_NODE_SELECT_STMT:
return jsonToSelectStmt(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_SCAN:
// return jsonToLogicScanNode(pJson, pObj);
case QUERY_NODE_CREATE_TOPIC_STMT:
return jsonToCreateTopicStmt(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return jsonToLogicScanNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
// return jsonToLogicJoinNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return jsonToLogicAggNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_PROJECT:
// return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
......
......@@ -499,7 +499,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
}
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
if (pCxt->streamQuery) {
if (pCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
}
......@@ -1393,7 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
SCMCreateTopicReq createReq = {0};
if (NULL != pStmt->pQuery) {
pCxt->pParseCxt->streamQuery = true;
pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
......
......@@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE;
pScan->scanType = pCxt->pPlanCxt->topicQuery ? SCAN_TYPE_TOPIC : SCAN_TYPE_TABLE;
pScan->scanFlag = MAIN_SCAN;
pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T;
......
......@@ -272,6 +272,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl
return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
case SCAN_TYPE_TOPIC:
case SCAN_TYPE_STREAM:
return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
default:
......@@ -472,11 +473,20 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
}
static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
CHECK_ALLOC(pExchange, NULL);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
return (SPhysiNode*)pExchange;
if (pCxt->pPlanCxt->streamQuery) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
CHECK_ALLOC(pScan, NULL);
pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
CHECK_ALLOC(pScan->pScanCols, (SPhysiNode*)pScan);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc), (SPhysiNode*)pScan);
return (SPhysiNode*)pScan;
} else {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
CHECK_ALLOC(pExchange, NULL);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
return (SPhysiNode*)pExchange;
}
}
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
......@@ -614,7 +624,9 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode);
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
if (!pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
}
pSubplan->msgType = TDMT_VND_QUERY;
}
return pSubplan;
......
......@@ -44,7 +44,8 @@ typedef struct SStsInfo {
} SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType &&
SCAN_TYPE_TOPIC != ((SScanLogicNode*)pNode)->scanType) {
return pNode;
}
SNode* pChild;
......
......@@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include "cmdnodes.h"
#include "parser.h"
#include "planInt.h"
......@@ -56,7 +57,8 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
SPlanContext cxt = { .queryId = 1, .acctId = 0 };
setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
......@@ -94,6 +96,15 @@ protected:
private:
static const int max_err_len = 1024;
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
pCxt->topicQuery = true;
} else {
pCxt->pAstRoot = pQuery->pRoot;
}
}
void reset() {
memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len);
......@@ -173,3 +184,10 @@ TEST_F(PlannerTest, interval) {
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, createTopic) {
setDatabase("root", "test");
bind("create topic tp as SELECT * FROM st1");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册