Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2f3a58f8
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2f3a58f8
编写于
1月 14, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818]support taos_create_topic
上级
205aae66
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
204 addition
and
167 deletion
+204
-167
include/client/taos.h
include/client/taos.h
+1
-2
include/common/tmsg.h
include/common/tmsg.h
+4
-1
include/common/tname.h
include/common/tname.h
+0
-2
include/util/tcoding.h
include/util/tcoding.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+64
-33
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+77
-87
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-2
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+28
-21
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+14
-14
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+9
-3
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+3
-1
未找到文件。
include/client/taos.h
浏览文件 @
2f3a58f8
...
...
@@ -193,8 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
#ifdef __cplusplus
}
...
...
include/common/tmsg.h
浏览文件 @
2f3a58f8
...
...
@@ -1053,6 +1053,7 @@ typedef struct {
typedef
struct
{
int8_t
igExists
;
char
*
name
;
char
*
sql
;
char
*
physicalPlan
;
char
*
logicalPlan
;
}
SCMCreateTopicReq
;
...
...
@@ -1061,6 +1062,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
igExists
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
sql
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
physicalPlan
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
logicalPlan
);
return
tlen
;
...
...
@@ -1069,6 +1071,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT
static
FORCE_INLINE
void
*
tDeserializeSCMCreateTopicReq
(
void
*
buf
,
SCMCreateTopicReq
*
pReq
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
(
pReq
->
igExists
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
name
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
sql
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
physicalPlan
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
logicalPlan
));
return
buf
;
...
...
@@ -1191,7 +1194,7 @@ typedef struct {
}
SMVSubscribeRsp
;
typedef
struct
{
char
name
[
TSDB_TOPIC_
F
NAME_LEN
];
char
name
[
TSDB_TOPIC_NAME_LEN
];
int8_t
igExists
;
int32_t
execLen
;
void
*
executor
;
...
...
include/common/tname.h
浏览文件 @
2f3a58f8
...
...
@@ -25,14 +25,12 @@
#define T_NAME_ACCT 0x1u
#define T_NAME_DB 0x2u
#define T_NAME_TABLE 0x4u
#define T_NAME_TOPIC 0x8u
typedef
struct
SName
{
uint8_t
type
;
//db_name_t, table_name_t
int32_t
acctId
;
char
dbname
[
TSDB_DB_NAME_LEN
];
char
tname
[
TSDB_TABLE_NAME_LEN
];
char
topicName
[
TSDB_TOPIC_NAME_LEN
];
}
SName
;
int32_t
tNameExtractFullName
(
const
SName
*
name
,
char
*
dst
);
...
...
include/util/tcoding.h
浏览文件 @
2f3a58f8
...
...
@@ -351,6 +351,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
buf
=
taosDecodeVariantU64
(
buf
,
&
size
);
*
value
=
(
char
*
)
malloc
((
size_t
)
size
+
1
);
if
(
*
value
==
NULL
)
return
NULL
;
memcpy
(
*
value
,
buf
,
(
size_t
)
size
);
...
...
include/util/tdef.h
浏览文件 @
2f3a58f8
...
...
@@ -181,7 +181,7 @@ do { \
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN
512
#define TSDB_MAX_SQL_SHOW_LEN
1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
...
...
source/client/src/clientImpl.c
浏览文件 @
2f3a58f8
...
...
@@ -13,12 +13,12 @@
#include "tpagedfile.h"
#include "tref.h"
#define CHECK_CODE_GOTO(expr, lab
le
) \
#define CHECK_CODE_GOTO(expr, lab
el
) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto lab
le
; \
goto lab
el
; \
} \
} while (0)
...
...
@@ -258,36 +258,62 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
NULL
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
}
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQuery
=
NULL
;
SQueryDag
*
pDag
=
NULL
;
char
*
dagStr
=
NULL
;
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
topicName
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
char
*
pStr
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
if
(
taos
==
NULL
||
topicName
==
NULL
||
sql
==
NULL
)
{
tscError
(
"invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s"
,
taos
,
topicName
,
sql
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
strlen
(
topicName
)
>=
TSDB_TOPIC_NAME_LEN
)
{
tscError
(
"topic name too long, max length:%d"
,
TSDB_TOPIC_NAME_LEN
-
1
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
tscError
(
"sql string exceeds max length:%d"
,
tsMaxSQLStringLen
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
goto
_return
;
}
tscDebug
(
"start to create topic, %s"
,
topicName
);
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQueryNode
),
_return
);
//temporary disabled until planner ready
#if 0
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
//TODO: check sql valid
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(qCreateQueryDag(pQuery
, &pDag
), _return);
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQuery
Node
,
&
pRequest
->
body
.
pDag
,
pRequest
->
requestId
),
_return
);
dagStr = qDagToString(
pDag);
if(
dag
Str == NULL) {
//TODO
pStr
=
qDagToString
(
pRequest
->
body
.
pDag
);
if
(
p
Str
==
NULL
)
{
goto
_return
;
}
#endif
// The topic should be related to a database that the queried table is belonged to.
SName
name
=
{
0
};
char
dbName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
((
SQueryStmtInfo
*
)
pQueryNode
)
->
pTableMetaInfo
[
0
]
->
name
,
dbName
);
tNameFromString
(
&
name
,
dbName
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
topicFname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
name
,
topicFname
);
SCMCreateTopicReq
req
=
{
.
name
=
(
char
*
)
name
,
.
igExists
=
0
,
/*.physicalPlan = dagStr,*/
.
physicalPlan
=
(
char
*
)
sql
,
.
logicalPlan
=
"
"
,
.
name
=
(
char
*
)
topicF
name
,
.
igExists
=
0
,
.
physicalPlan
=
(
char
*
)
pStr
,
.
sql
=
(
char
*
)
sql
,
.
logicalPlan
=
"no logic plan
"
,
};
int
tlen
=
tSerializeSCMCreateTopicReq
(
NULL
,
&
req
);
...
...
@@ -295,27 +321,32 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq
if
(
buf
==
NULL
)
{
goto
_return
;
}
void
*
abuf
=
buf
;
tSerializeSCMCreateTopicReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
type
=
TDMT_MND_CREATE_TOPIC
;
SMsgSendInfo
*
body
=
buildMsgInfoImpl
(
pRequest
);
SEpSet
*
pEpSet
=
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
;
SEpSet
epSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)
;
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
pE
pSet
,
&
transporterId
,
body
);
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
e
pSet
,
&
transporterId
,
body
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
qDestroyQuery
(
pQuery
);
qDestroyQueryDag
(
pDag
);
destroySendMsgInfo
(
body
);
qDestroyQuery
(
pQueryNode
);
if
(
body
!=
NULL
)
{
destroySendMsgInfo
(
body
);
}
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
...
...
@@ -330,22 +361,22 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
nPrintTsc
(
"%s"
,
sql
)
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQuery
=
NULL
;
SQueryNode
*
pQuery
Node
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQuery
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQuery
Node
),
_return
);
if
(
qIsDdlQuery
(
pQuery
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQuery
),
_return
);
if
(
qIsDdlQuery
(
pQuery
Node
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQuery
Node
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQuery
Node
,
&
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
),
_return
);
pRequest
->
code
=
terrno
;
}
_return:
qDestroyQuery
(
pQuery
);
qDestroyQuery
(
pQuery
Node
);
if
(
NULL
!=
pRequest
&&
TSDB_CODE_SUCCESS
!=
terrno
)
{
pRequest
->
code
=
terrno
;
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
2f3a58f8
...
...
@@ -48,14 +48,14 @@ int main(int argc, char** argv) {
TEST
(
testCase
,
driverInit_Test
)
{
taos_init
();
}
//
TEST(testCase, connect_Test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
if (pConn == NULL) {
//
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
//
}
//
taos_close(pConn);
//
}
//
TEST
(
testCase
,
connect_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
printf
(
"failed to connect to server, reason:%s
\n
"
,
taos_errstr
(
NULL
));
}
taos_close
(
pConn
);
}
//TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
...
...
@@ -514,39 +514,29 @@ TEST(testCase, driverInit_Test) { taos_init(); }
// taosHashCleanup(phash);
//}
//
//// TEST(testCase, create_topic_Test) {
//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
////
//// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
//// if (taos_errno(pRes) != 0) {
//// printf("error in create db, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "use abc1");
//// if (taos_errno(pRes) != 0) {
//// printf("error in use db, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
//// if (taos_errno(pRes) != 0) {
//// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
//// }
////
//// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//// ASSERT_TRUE(pFields == NULL);
////
//// int32_t numOfFields = taos_num_fields(pRes);
//// ASSERT_EQ(numOfFields, 0);
////
//// taos_free_result(pRes);
////
//// char* sql = "select * from st1";
//// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
//// taos_close(pConn);
////}
TEST
(
testCase
,
create_topic_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
taos_free_result
(
pRes
);
char
*
sql
=
"select * from tu"
;
pRes
=
taos_create_topic
(
pConn
,
"test_topic_1"
,
sql
,
strlen
(
sql
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
...
@@ -565,7 +555,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
...
...
@@ -573,30 +563,58 @@ TEST(testCase, driverInit_Test) { taos_init(); }
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "create table tu using st1 tags(1)");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// for(int32_t i = 0; i < 100; ++i) {
//// char sql[512] = {0};
//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
//// TAOS_RES* p = taos_query(pConn, sql);
//// if (taos_errno(p) != 0) {
//// printf("failed to insert data, reason:%s\n", taos_errstr(p));
//// }
////
//// taos_free_result(p);
//// }
//
// pRes = taos_query(pConn, "
create table tu using st1 tags(1)
");
// pRes = taos_query(pConn, "
select * from tu
");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// taos_free_result(pRes);
//
// for(int32_t i = 0; i < 100; ++i) {
// char sql[512] = {0};
// sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
// TAOS_RES* p = taos_query(pConn, sql);
// if (taos_errno(p) != 0) {
// printf("failed to insert data, reason:%s\n", taos_errstr(p));
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// taos_free_result(p);
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// pRes = taos_query(pConn, "select * from tu");
// taos_free_result(pRes);
// taos_close(pConn);
//}
//TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts,k from m1");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
...
...
@@ -616,31 +634,3 @@ TEST(testCase, driverInit_Test) { taos_init(); }
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST
(
testCase
,
projection_query_stables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"select ts,k from m1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2f3a58f8
...
...
@@ -350,7 +350,7 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id
typedef
struct
SMqConsumerTopic
{
char
name
[
TSDB_TOPIC_
F
NAME_LEN
];
char
name
[
TSDB_TOPIC_NAME_LEN
];
SList
*
vgroups
;
// SList<int32_t>
}
SMqConsumerTopic
;
...
...
@@ -409,7 +409,7 @@ typedef struct SMqVGroupHbObj {
#if 0
typedef struct SCGroupObj {
char name[TSDB_TOPIC_
F
NAME_LEN];
char name[TSDB_TOPIC_NAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
2f3a58f8
...
...
@@ -118,11 +118,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
dbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
version
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
pTopic
->
sql
=
calloc
(
pTopic
->
sqlLen
+
1
,
sizeof
(
char
));
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
,
TOPIC_DECODE_OVER
);
//
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
//
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
//
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
//
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
...
...
@@ -178,7 +180,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
static
SDbObj
*
mndAcquireDbByTopic
(
SMnode
*
pMnode
,
char
*
topicName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
topicName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_T
OPIC
);
tNameFromString
(
&
name
,
topicName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_T
ABLE
);
char
db
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
db
);
...
...
@@ -203,20 +205,24 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
return
pDrop
;
}
static
int32_t
mndCheckCreateTopicMsg
(
SCMCreateTopicReq
*
pCreate
)
{
static
int32_t
mndCheckCreateTopicMsg
(
SCMCreateTopicReq
*
creattopReq
)
{
// deserialize and other stuff
return
0
;
}
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
)
{
SMqTopicObj
topicObj
=
{
0
};
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_T
ABLE
_FNAME_LEN
);
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_T
OPIC
_FNAME_LEN
);
tstrncpy
(
topicObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
topicObj
.
createTime
=
taosGetTimestampMs
();
topicObj
.
updateTime
=
topicObj
.
createTime
;
topicObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
topicObj
.
dbUid
=
pDb
->
uid
;
topicObj
.
version
=
1
;
topicObj
.
sql
=
strdup
(
pCreate
->
sql
);
topicObj
.
physicalPlan
=
strdup
(
pCreate
->
physicalPlan
);
topicObj
.
logicalPlan
=
strdup
(
pCreate
->
logicalPlan
);
topicObj
.
sqlLen
=
strlen
(
pCreate
->
sql
);
SSdbRaw
*
pTopicRaw
=
mndTopicActionEncode
(
&
topicObj
);
if
(
pTopicRaw
==
NULL
)
return
-
1
;
...
...
@@ -228,46 +234,47 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
static
int32_t
mndProcessCreateTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMCreateTopicReq
*
pCreate
;
tDeserializeSCMCreateTopicReq
(
msgStr
,
pCreate
);
mDebug
(
"topic:%s, start to create"
,
pCreate
->
name
);
SCMCreateTopicReq
createTopicReq
=
{
0
};
tDeserializeSCMCreateTopicReq
(
msgStr
,
&
createTopicReq
);
mDebug
(
"topic:%s, start to create, sql:%s"
,
createTopicReq
.
name
,
createTopicReq
.
sql
);
if
(
mndCheckCreateTopicMsg
(
pCreate
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
if
(
mndCheckCreateTopicMsg
(
&
createTopicReq
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
return
-
1
;
}
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
pCreate
->
name
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
createTopicReq
.
name
);
if
(
pTopic
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pTopic
);
if
(
pCreate
->
igExists
)
{
mDebug
(
"topic:%s, already exist, ignore exist is set"
,
pCreate
->
name
);
if
(
createTopicReq
.
igExists
)
{
mDebug
(
"topic:%s, already exist, ignore exist is set"
,
createTopicReq
.
name
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_TOPIC_ALREADY_EXIST
;
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"db:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
return
-
1
;
}
}
SDbObj
*
pDb
=
mndAcquireDbByTopic
(
pMnode
,
pCreate
->
name
);
SDbObj
*
pDb
=
mndAcquireDbByTopic
(
pMnode
,
createTopicReq
.
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"topic:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
return
-
1
;
}
int32_t
code
=
mndCreateTopic
(
pMnode
,
pMsg
,
pCreate
,
pDb
);
int32_t
code
=
mndCreateTopic
(
pMnode
,
pMsg
,
&
createTopicReq
,
pDb
);
mndReleaseDb
(
pMnode
,
pDb
);
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"topic:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_
MND_ACTION_IN_PROGR
ESS
;
return
TSDB_CODE_
SUCC
ESS
;
}
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SMqTopicObj
*
pTopic
)
{
return
0
;
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
2f3a58f8
...
...
@@ -285,16 +285,16 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
char
dbFullName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
pTableName
,
dbFullName
);
ctgDebug
(
"try to get table meta from vnode, db:%s, tbName:%s"
,
dbFullName
,
pTableName
->
tname
);
ctgDebug
(
"try to get table meta from vnode, db:%s, tbName:%s"
,
dbFullName
,
tNameGetTableName
(
pTableName
)
);
SBuildTableMetaInput
bInput
=
{.
vgId
=
vgroupInfo
->
vgId
,
.
dbName
=
dbFullName
,
.
tableFullName
=
(
char
*
)
pTableName
->
tname
};
SBuildTableMetaInput
bInput
=
{.
vgId
=
vgroupInfo
->
vgId
,
.
dbName
=
dbFullName
,
.
tableFullName
=
(
char
*
)
tNameGetTableName
(
pTableName
)
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
ctgError
(
"Build vnode tablemeta msg failed, code:%x, tbName:%s"
,
code
,
pTableName
->
tname
);
ctgError
(
"Build vnode tablemeta msg failed, code:%x, tbName:%s"
,
code
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_RET
(
code
);
}
...
...
@@ -313,21 +313,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
SET_META_TYPE_NONE
(
output
->
metaType
);
ctgDebug
(
"tablemeta not exist in vnode, tbName:%s"
,
pTableName
->
tname
);
ctgDebug
(
"tablemeta not exist in vnode, tbName:%s"
,
tNameGetTableName
(
pTableName
)
);
return
TSDB_CODE_SUCCESS
;
}
ctgError
(
"error rsp for table meta from vnode, code:%x, tbName:%s"
,
rpcRsp
.
code
,
pTableName
->
tname
);
ctgError
(
"error rsp for table meta from vnode, code:%x, tbName:%s"
,
rpcRsp
.
code
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_RET
(
rpcRsp
.
code
);
}
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
ctgError
(
"Process vnode tablemeta rsp failed, code:%x, tbName:%s"
,
code
,
pTableName
->
tname
);
ctgError
(
"Process vnode tablemeta rsp failed, code:%x, tbName:%s"
,
code
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_RET
(
code
);
}
ctgDebug
(
"Got table meta from vnode, db:%s, tbName:%s"
,
dbFullName
,
pTableName
->
tname
);
ctgDebug
(
"Got table meta from vnode, db:%s, tbName:%s"
,
dbFullName
,
tNameGetTableName
(
pTableName
)
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -776,7 +776,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
STableMetaOutput
*
output
=
&
voutput
;
if
(
CTG_IS_STABLE
(
isSTable
))
{
ctgDebug
(
"will renew table meta, supposed to be stable, tbName:%s"
,
pTableName
->
tname
);
ctgDebug
(
"will renew table meta, supposed to be stable, tbName:%s"
,
tNameGetTableName
(
pTableName
)
);
// if get from mnode failed, will not try vnode
CTG_ERR_JRET
(
ctgGetTableMetaFromMnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
moutput
));
...
...
@@ -787,13 +787,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
output
=
&
moutput
;
}
}
else
{
ctgDebug
(
"will renew table meta, not supposed to be stable, tbName:%s, isStable:%d"
,
pTableName
->
tname
,
isSTable
);
ctgDebug
(
"will renew table meta, not supposed to be stable, tbName:%s, isStable:%d"
,
tNameGetTableName
(
pTableName
)
,
isSTable
);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
&
voutput
));
if
(
CTG_IS_META_TABLE
(
voutput
.
metaType
)
&&
TSDB_SUPER_TABLE
==
voutput
.
tbMeta
->
tableType
)
{
ctgDebug
(
"will continue to renew table meta since got stable, tbName:%s, metaType:%d"
,
pTableName
->
tname
,
voutput
.
metaType
);
ctgDebug
(
"will continue to renew table meta since got stable, tbName:%s, metaType:%d"
,
tNameGetTableName
(
pTableName
)
,
voutput
.
metaType
);
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
voutput
.
tbFname
,
&
moutput
));
...
...
@@ -820,7 +820,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
}
if
(
CTG_IS_META_NONE
(
output
->
metaType
))
{
ctgError
(
"no tablemeta got, tbNmae:%s"
,
pTableName
->
tname
);
ctgError
(
"no tablemeta got, tbNmae:%s"
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_JRET
(
CTG_ERR_CODE_TABLE_NOT_EXIST
);
}
...
...
@@ -860,7 +860,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCatalog
,
pTableName
,
pTableMeta
,
&
exist
));
if
(
0
==
exist
)
{
ctgError
(
"renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s"
,
pTableName
->
tname
);
ctgError
(
"renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s"
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -1241,7 +1241,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
}
else
{
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
->
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"table's vgId not found in vgroup list, vgId:%d, tbName:%s"
,
vgId
,
pTableName
->
tname
);
ctgError
(
"table's vgId not found in vgroup list, vgId:%d, tbName:%s"
,
vgId
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -1252,7 +1252,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
}
if
(
NULL
==
taosArrayPush
(
vgList
,
&
vgroupInfo
))
{
ctgError
(
"taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s"
,
vgId
,
pTableName
->
tname
);
ctgError
(
"taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s"
,
vgId
,
tNameGetTableName
(
pTableName
)
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
2f3a58f8
...
...
@@ -1057,14 +1057,18 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
if
(
pRoot
==
NULL
)
{
return
NULL
;
}
cJSON_AddNumberToObject
(
pRoot
,
"numOfSubplans"
,
pDag
->
numOfSubplans
);
cJSON_AddNumberToObject
(
pRoot
,
"queryId"
,
pDag
->
queryId
);
cJSON_AddNumberToObject
(
pRoot
,
"Number"
,
pDag
->
numOfSubplans
);
cJSON_AddNumberToObject
(
pRoot
,
"QueryId"
,
pDag
->
queryId
);
cJSON
*
pLevels
=
cJSON_CreateArray
();
if
(
pLevels
==
NULL
)
{
cJSON_Delete
(
pRoot
);
return
NULL
;
}
cJSON_AddItemToObject
(
pRoot
,
"pSubplans"
,
pLevels
);
cJSON_AddItemToObject
(
pRoot
,
"Subplans"
,
pLevels
);
size_t
level
=
taosArrayGetSize
(
pDag
->
pSubplans
);
for
(
size_t
i
=
0
;
i
<
level
;
i
++
)
{
const
SArray
*
pSubplans
=
(
const
SArray
*
)
taosArrayGetP
(
pDag
->
pSubplans
,
i
);
...
...
@@ -1074,6 +1078,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON_Delete
(
pRoot
);
return
NULL
;
}
cJSON_AddItemToArray
(
pLevels
,
plansOneLevel
);
for
(
size_t
j
=
0
;
j
<
num
;
j
++
)
{
cJSON
*
pSubplan
=
subplanToJson
((
const
SSubplan
*
)
taosArrayGetP
(
pSubplans
,
j
));
...
...
@@ -1081,6 +1086,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON_Delete
(
pRoot
);
return
NULL
;
}
cJSON_AddItemToArray
(
plansOneLevel
,
pSubplan
);
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
2f3a58f8
...
...
@@ -1029,6 +1029,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
msg
=
pTask
->
msg
;
break
;
}
case
TDMT_VND_QUERY
:
{
msgSize
=
sizeof
(
SSubQueryMsg
)
+
pTask
->
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
...
...
@@ -1047,7 +1048,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg
->
contentLen
=
htonl
(
pTask
->
msgLen
);
memcpy
(
pMsg
->
msg
,
pTask
->
msg
,
pTask
->
msgLen
);
break
;
}
}
case
TDMT_VND_RES_READY
:
{
msgSize
=
sizeof
(
SResReadyReq
);
msg
=
calloc
(
1
,
msgSize
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录