diff --git a/include/client/taos.h b/include/client/taos.h index 4669ca51f7d80dd40a9d37047cf758faa680c9c8..84f625571095e519b3268d42324719461d5ac650 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -193,8 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); - -DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2aaa2168cc004fc88eb3fccbca5b248964461107..3fb45794513f0f362cfcf26bb809dd5d059b1d58 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1053,6 +1053,7 @@ typedef struct { typedef struct { int8_t igExists; char* name; + char* sql; char* physicalPlan; char* logicalPlan; } SCMCreateTopicReq; @@ -1061,6 +1062,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT int tlen = 0; tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; @@ -1069,6 +1071,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->sql)); buf = taosDecodeString(buf, &(pReq->physicalPlan)); buf = taosDecodeString(buf, &(pReq->logicalPlan)); return buf; @@ -1191,7 +1194,7 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; int8_t igExists; int32_t execLen; void* executor; diff --git a/include/common/tname.h b/include/common/tname.h index 11d97dac06d7240664f48626cd9d58ad6c555afa..12a0d34cb4fef92d0f65de535808f1150aa2c33d 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -25,14 +25,12 @@ #define T_NAME_ACCT 0x1u #define T_NAME_DB 0x2u #define T_NAME_TABLE 0x4u -#define T_NAME_TOPIC 0x8u typedef struct SName { uint8_t type; //db_name_t, table_name_t int32_t acctId; char dbname[TSDB_DB_NAME_LEN]; char tname[TSDB_TABLE_NAME_LEN]; - char topicName[TSDB_TOPIC_NAME_LEN]; } SName; int32_t tNameExtractFullName(const SName* name, char* dst); diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 226856901f2d20ae6ce50f81989aba032ca38d6c..819878704833db6db9994e36f04fc3741b2461bb 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -351,6 +351,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { buf = taosDecodeVariantU64(buf, &size); *value = (char *)malloc((size_t)size + 1); + if (*value == NULL) return NULL; memcpy(*value, buf, (size_t)size); diff --git a/include/util/tdef.h b/include/util/tdef.h index 9f16b58e0db30e8357da720f5254605f0d5bec08..428de5d17138a9f869b4ab0b3fc987759c256bba 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -181,7 +181,7 @@ do { \ #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 512 +#define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb #define TSDB_APP_NAME_LEN TSDB_UNI_LEN diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3090955f692b124e5b0ea5caa3723e79c2e78915..df7e10af7d24b196979e194367cb3bb18bb422a9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -13,12 +13,12 @@ #include "tpagedfile.h" #include "tref.h" -#define CHECK_CODE_GOTO(expr, lable) \ +#define CHECK_CODE_GOTO(expr, label) \ do { \ int32_t code = expr; \ if (TSDB_CODE_SUCCESS != code) { \ terrno = code; \ - goto lable; \ + goto label; \ } \ } while (0) @@ -258,36 +258,62 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); } -TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { - STscObj* pTscObj = (STscObj*)taos; - SRequestObj* pRequest = NULL; - SQueryNode* pQuery = NULL; - SQueryDag* pDag = NULL; - char *dagStr = NULL; +TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { + STscObj *pTscObj = (STscObj*)taos; + SRequestObj *pRequest = NULL; + SQueryNode *pQueryNode = NULL; + char *pStr = NULL; terrno = TSDB_CODE_SUCCESS; + if (taos == NULL || topicName == NULL || sql == NULL) { + tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + + if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) { + tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + + if (sqlLen > tsMaxSQLStringLen) { + tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + goto _return; + } + + tscDebug("start to create topic, %s", topicName); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); -//temporary disabled until planner ready -#if 0 - CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); - //TODO: check sql valid + // todo check for invalid sql statement and return with error code - CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); - dagStr = qDagToString(pDag); - if(dagStr == NULL) { - //TODO + pStr = qDagToString(pRequest->body.pDag); + if(pStr == NULL) { + goto _return; } -#endif + + // The topic should be related to a database that the queried table is belonged to. + SName name = {0}; + char dbName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName); + + tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB); + tNameFromString(&name, topicName, T_NAME_TABLE); + + char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; + tNameExtractFullName(&name, topicFname); SCMCreateTopicReq req = { - .name = (char*)name, - .igExists = 0, - /*.physicalPlan = dagStr,*/ - .physicalPlan = (char*)sql, - .logicalPlan = "", + .name = (char*) topicFname, + .igExists = 0, + .physicalPlan = (char*) pStr, + .sql = (char*) sql, + .logicalPlan = "no logic plan", }; int tlen = tSerializeSCMCreateTopicReq(NULL, &req); @@ -295,27 +321,32 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq if(buf == NULL) { goto _return; } + void* abuf = buf; tSerializeSCMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); - SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; + SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); _return: - qDestroyQuery(pQuery); - qDestroyQueryDag(pDag); - destroySendMsgInfo(body); + qDestroyQuery(pQueryNode); + if (body != NULL) { + destroySendMsgInfo(body); + } + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { pRequest->code = terrno; } + return pRequest; } @@ -330,22 +361,22 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) SRequestObj *pRequest = NULL; - SQueryNode *pQuery = NULL; + SQueryNode *pQueryNode = NULL; terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); - if (qIsDdlQuery(pQuery)) { - CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); + if (qIsDdlQuery(pQueryNode)) { + CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); } else { - CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag), _return); + CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return); pRequest->code = terrno; } _return: - qDestroyQuery(pQuery); + qDestroyQuery(pQueryNode); if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 7086da0d550885ae09ffecbdc88bd12c7e763ffb..429657b61774ab24f06e160e69f8a7d83b474da4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -48,14 +48,14 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } -//TEST(testCase, connect_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// if (pConn == NULL) { -// printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); -// } -// taos_close(pConn); -//} -// +TEST(testCase, connect_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); + } + taos_close(pConn); +} + //TEST(testCase, create_user_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -514,39 +514,29 @@ TEST(testCase, driverInit_Test) { taos_init(); } // taosHashCleanup(phash); //} // -//// TEST(testCase, create_topic_Test) { -//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -//// -//// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// pRes = taos_query(pConn, "use abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); -//// if (taos_errno(pRes) != 0) { -//// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); -//// } -//// -//// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -//// ASSERT_TRUE(pFields == NULL); -//// -//// int32_t numOfFields = taos_num_fields(pRes); -//// ASSERT_EQ(numOfFields, 0); -//// -//// taos_free_result(pRes); -//// -//// char* sql = "select * from st1"; -//// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -//// taos_close(pConn); -////} +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} // //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -565,7 +555,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } // taos_free_result(pRes); // taos_close(pConn); //} -// + //TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); @@ -573,30 +563,58 @@ TEST(testCase, driverInit_Test) { taos_init(); } // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); +//// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// for(int32_t i = 0; i < 100; ++i) { +//// char sql[512] = {0}; +//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); +//// TAOS_RES* p = taos_query(pConn, sql); +//// if (taos_errno(p) != 0) { +//// printf("failed to insert data, reason:%s\n", taos_errstr(p)); +//// } +//// +//// taos_free_result(p); +//// } // -// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); +// pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); // } -// taos_free_result(pRes); // -// for(int32_t i = 0; i < 100; ++i) { -// char sql[512] = {0}; -// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); -// TAOS_RES* p = taos_query(pConn, sql); -// if (taos_errno(p) != 0) { -// printf("failed to insert data, reason:%s\n", taos_errstr(p)); -// } +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); // -// taos_free_result(p); +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); // } // -// pRes = taos_query(pConn, "select * from tu"); +// taos_free_result(pRes); +// taos_close(pConn); +//} + +//TEST(testCase, projection_query_stables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select ts,k from m1"); // if (taos_errno(pRes) != 0) { // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); @@ -616,31 +634,3 @@ TEST(testCase, driverInit_Test) { taos_init(); } // taos_free_result(pRes); // taos_close(pConn); //} - -TEST(testCase, projection_query_stables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "select ts,k from m1"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a55e0dd2b252c328221d7b1e6d644eadf8e5db68..0c2524f48c375b3432d1f7931fefe0e87b302ef0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -350,7 +350,7 @@ typedef struct SMqTopicObj { // TODO: add cache and change name to id typedef struct SMqConsumerTopic { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; SList *vgroups; // SList } SMqConsumerTopic; @@ -409,7 +409,7 @@ typedef struct SMqVGroupHbObj { #if 0 typedef struct SCGroupObj { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; int64_t createTime; int64_t updateTime; uint64_t uid; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index acdc718f20c3176313bbed172daffc55f7892bba..67aeae0d4c3f35b1637b1d1ec51f2809e28bc4dc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -118,11 +118,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); + + pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char)); SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); - SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); - SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); +// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); +// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); +// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); +// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) @@ -178,7 +180,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; - tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TOPIC); + tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); char db[TSDB_TABLE_FNAME_LEN] = {0}; tNameGetFullDbName(&name, db); @@ -203,20 +205,24 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { +static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { // deserialize and other stuff return 0; } static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { SMqTopicObj topicObj = {0}; - tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); + tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); topicObj.createTime = taosGetTimestampMs(); topicObj.updateTime = topicObj.createTime; topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); topicObj.dbUid = pDb->uid; topicObj.version = 1; + topicObj.sql = strdup(pCreate->sql); + topicObj.physicalPlan = strdup(pCreate->physicalPlan); + topicObj.logicalPlan = strdup(pCreate->logicalPlan); + topicObj.sqlLen = strlen(pCreate->sql); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; @@ -228,46 +234,47 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; - SCMCreateTopicReq *pCreate; - tDeserializeSCMCreateTopicReq(msgStr, pCreate); - mDebug("topic:%s, start to create", pCreate->name); + SCMCreateTopicReq createTopicReq = {0}; + tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); + + mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); - if (mndCheckCreateTopicMsg(pCreate) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + if (mndCheckCreateTopicMsg(&createTopicReq) != 0) { + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name); if (pTopic != NULL) { sdbRelease(pMnode->pSdb, pTopic); - if (pCreate->igExists) { - mDebug("topic:%s, already exist, ignore exist is set", pCreate->name); + if (createTopicReq.igExists) { + mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name); return 0; } else { terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; - mError("db:%s, failed to create since %s", pCreate->name, terrstr()); + mError("db:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } } - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); + SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - int32_t code = mndCreateTopic(pMnode, pMsg, pCreate, pDb); + int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb); mndReleaseDb(pMnode, pDb); if (code != 0) { terrno = code; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_SUCCESS; } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 94f34b8e1726dcd234c6e08eb426536f1f669edf..f6b752bc6cbbd89bb3a897c41ac8e9f981aa2ed5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -285,16 +285,16 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, char dbFullName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFullName); - ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); + ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName)); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen); if (code) { - ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname); + ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName)); CTG_ERR_RET(code); } @@ -313,21 +313,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { SET_META_TYPE_NONE(output->metaType); - ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname); + ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } - ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname); + ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, tNameGetTableName(pTableName)); CTG_ERR_RET(rpcRsp.code); } code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen); if (code) { - ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname); + ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName)); CTG_ERR_RET(code); } - ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); + ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } @@ -776,7 +776,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con STableMetaOutput *output = &voutput; if (CTG_IS_STABLE(isSTable)) { - ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname); + ctgDebug("will renew table meta, supposed to be stable, tbName:%s", tNameGetTableName(pTableName)); // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); @@ -787,13 +787,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con output = &moutput; } } else { - ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable); + ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable); // if get from vnode failed or no table meta, will not try mnode CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { - ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType); + ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); @@ -820,7 +820,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con } if (CTG_IS_META_NONE(output->metaType)) { - ctgError("no tablemeta got, tbNmae:%s", pTableName->tname); + ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -860,7 +860,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); if (0 == exist) { - ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname); + ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName)); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1241,7 +1241,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S } else { int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { - ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname); + ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1252,7 +1252,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S } if (NULL == taosArrayPush(vgList, &vgroupInfo)) { - ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname); + ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 2e553fe4e67dba5377c1485faac858a09d6605a2..97eb63ac3129bc8454b6263b7f8aca31f2dde397 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -1057,14 +1057,18 @@ cJSON* qDagToJson(const SQueryDag* pDag) { if(pRoot == NULL) { return NULL; } - cJSON_AddNumberToObject(pRoot, "numOfSubplans", pDag->numOfSubplans); - cJSON_AddNumberToObject(pRoot, "queryId", pDag->queryId); + + cJSON_AddNumberToObject(pRoot, "Number", pDag->numOfSubplans); + cJSON_AddNumberToObject(pRoot, "QueryId", pDag->queryId); + cJSON *pLevels = cJSON_CreateArray(); if(pLevels == NULL) { cJSON_Delete(pRoot); return NULL; } - cJSON_AddItemToObject(pRoot, "pSubplans", pLevels); + + cJSON_AddItemToObject(pRoot, "Subplans", pLevels); + size_t level = taosArrayGetSize(pDag->pSubplans); for(size_t i = 0; i < level; i++) { const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i); @@ -1074,6 +1078,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { cJSON_Delete(pRoot); return NULL; } + cJSON_AddItemToArray(pLevels, plansOneLevel); for(size_t j = 0; j < num; j++) { cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j)); @@ -1081,6 +1086,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { cJSON_Delete(pRoot); return NULL; } + cJSON_AddItemToArray(plansOneLevel, pSubplan); } } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 09e5291c85fd80b7492af974a34506c16adf3407..94574762175cdf0c27718adfbf982e1bfdc163da 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1029,6 +1029,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, msg = pTask->msg; break; } + case TDMT_VND_QUERY: { msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msg = calloc(1, msgSize); @@ -1047,7 +1048,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->contentLen = htonl(pTask->msgLen); memcpy(pMsg->msg, pTask->msg, pTask->msgLen); break; - } + } + case TDMT_VND_RES_READY: { msgSize = sizeof(SResReadyReq); msg = calloc(1, msgSize);