diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6727dd328903d750f0abb0a197534c4e9e6e91ab..25002a9f929a6a6876e85341a221d3a98644ec6b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1063,6 +1063,27 @@ typedef struct STaskDropRsp { int32_t code; } STaskDropRsp; +typedef struct { + int8_t igExists; + char* name; + char* phyPlan; +} SCMCreateTopicReq; + +static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { + int tlen = 0; + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->phyPlan); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { + buf = taosDecodeFixedI8(buf, &(pReq->igExists)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->phyPlan)); + return buf; +} + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f2f3fcd49b581c97a01a081c063ffbfceddd69e7..c7d637ee8a7f3504ad4cf63abe37866e91954b34 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -46,7 +46,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); bool qIsDdlQuery(const SQueryNode* pQuery); -void qDestoryQuery(SQueryNode* pQuery); +void qDestroyQuery(SQueryNode* pQuery); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that @@ -86,4 +86,4 @@ void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* p } #endif -#endif /*_TD_PARSER_H_*/ \ No newline at end of file +#endif /*_TD_PARSER_H_*/ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0ee99f77aa9219a8ea135058f35f63702abedd9c..6220d7eb50a5b0e792fb865cff25acdc517e8555 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -212,6 +212,60 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } +TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + SRequestObj* pRequest = NULL; + SQueryNode* pQuery = NULL; + SQueryDag* pDag = NULL; + char *dagStr = NULL; + + //parse sql to logical plan and physical plan + //send topic name and plans to mnode + + terrno = TSDB_CODE_SUCCESS; + + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + //TODO: check sql valid + + CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + + dagStr = qDagToString(pDag); + if(dagStr == NULL) { + //TODO + } + + SCMCreateTopicReq req = { + .name = (char*)name, + .igExists = 0, + .phyPlan = dagStr, + }; + + void* buf = NULL; + int tlen = tSerializeSCMCreateTopicReq(&buf, &req); + + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); + SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; + + int64_t transporterId = 0; + asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + + tsem_wait(&pRequest->body.rspSem); + + destroySendMsgInfo(body); + +_return: + qDestroyQuery(pQuery); + qDestroyQueryDag(pDag); + destroySendMsgInfo(body); + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { + pRequest->code = terrno; + } + return pRequest; +} + TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { STscObj *pTscObj = (STscObj *)taos; if (sqlLen > (size_t) tsMaxSQLStringLen) { @@ -239,7 +293,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { } _return: - qDestoryQuery(pQuery); + qDestroyQuery(pQuery); qDestroyQueryDag(pDag); if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno;