Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cf665cdb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cf665cdb
编写于
7月 23, 2022
作者:
wmmhello
提交者:
GitHub
7月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15346 from taosdata/feature/TD-14761
feat: add interface for tmq writing raw data
上级
80a88605
4ee32f4d
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
740 addition
and
30 deletion
+740
-30
examples/c/CMakeLists.txt
examples/c/CMakeLists.txt
+15
-0
examples/c/tmq.c
examples/c/tmq.c
+1
-1
examples/c/tmq_taosx.c
examples/c/tmq_taosx.c
+459
-0
include/client/taos.h
include/client/taos.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+1
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+1
-1
source/client/src/tmq.c
source/client/src/tmq.c
+241
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-0
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+4
-4
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+10
-16
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+3
-2
未找到文件。
examples/c/CMakeLists.txt
浏览文件 @
cf665cdb
...
@@ -13,9 +13,15 @@ IF (TD_LINUX)
...
@@ -13,9 +13,15 @@ IF (TD_LINUX)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable
(
tmq
""
)
add_executable
(
tmq
""
)
add_executable
(
tmq_taosx
""
)
add_executable
(
stream_demo
""
)
add_executable
(
stream_demo
""
)
add_executable
(
demoapi
""
)
add_executable
(
demoapi
""
)
target_sources
(
tmq_taosx
PRIVATE
"tmq_taosx.c"
)
target_sources
(
tmq
target_sources
(
tmq
PRIVATE
PRIVATE
"tmq.c"
"tmq.c"
...
@@ -35,6 +41,10 @@ IF (TD_LINUX)
...
@@ -35,6 +41,10 @@ IF (TD_LINUX)
taos_static
taos_static
)
)
target_link_libraries
(
tmq_taosx
taos_static
)
target_link_libraries
(
stream_demo
target_link_libraries
(
stream_demo
taos_static
taos_static
)
)
...
@@ -47,6 +57,10 @@ IF (TD_LINUX)
...
@@ -47,6 +57,10 @@ IF (TD_LINUX)
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/os"
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
)
target_include_directories
(
tmq_taosx
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_include_directories
(
stream_demo
target_include_directories
(
stream_demo
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
...
@@ -59,6 +73,7 @@ IF (TD_LINUX)
...
@@ -59,6 +73,7 @@ IF (TD_LINUX)
)
)
SET_TARGET_PROPERTIES
(
tmq PROPERTIES OUTPUT_NAME tmq
)
SET_TARGET_PROPERTIES
(
tmq PROPERTIES OUTPUT_NAME tmq
)
SET_TARGET_PROPERTIES
(
tmq_taosx PROPERTIES OUTPUT_NAME tmq_taosx
)
SET_TARGET_PROPERTIES
(
stream_demo PROPERTIES OUTPUT_NAME stream_demo
)
SET_TARGET_PROPERTIES
(
stream_demo PROPERTIES OUTPUT_NAME stream_demo
)
SET_TARGET_PROPERTIES
(
demoapi PROPERTIES OUTPUT_NAME demoapi
)
SET_TARGET_PROPERTIES
(
demoapi PROPERTIES OUTPUT_NAME demoapi
)
ENDIF
()
ENDIF
()
...
...
examples/c/tmq.c
浏览文件 @
cf665cdb
...
@@ -302,7 +302,7 @@ int32_t create_topic() {
...
@@ -302,7 +302,7 @@ int32_t create_topic() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
examples/c/tmq_taosx.c
0 → 100644
浏览文件 @
cf665cdb
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
NULL
;
}
taos_free_result
(
pRes
);
return
pConn
;
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
TAOS
*
pConn
=
use_db
();
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
tmq_free_json_meta
(
result
);
tmq_raw_data
raw
=
{
0
};
tmq_get_raw_meta
(
msg
,
&
raw
);
int32_t
ret
=
taos_write_raw_meta
(
pConn
,
raw
);
printf
(
"write raw meta: %s
\n
"
,
tmq_err2str
(
ret
));
}
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_DATA
){
int32_t
ret
=
taos_write_raw_data
(
pConn
,
msg
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
}
taos_close
(
pConn
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database if exists db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 4"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(now, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(now, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add column c4 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 modify column c3 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add tag t2 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table ct3 set tag t1=5000"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to slter child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 add column c3 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 modify column c2 nchar(8)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 rename column c3 cc3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt1 using jt tags('{
\"
k1
\"
:1,
\"
k2
\"
:
\"
hello
\"
}')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn,
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
-
1
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
printf
(
"env init
\n
"
);
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
}
include/client/taos.h
浏览文件 @
cf665cdb
...
@@ -270,6 +270,7 @@ typedef enum tmq_res_t tmq_res_t;
...
@@ -270,6 +270,7 @@ typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_data
(
TAOS
*
taos
,
TAOS_RES
*
res
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
...
...
include/common/tmsg.h
浏览文件 @
cf665cdb
...
@@ -1977,6 +1977,7 @@ typedef struct SVCreateTbReq {
...
@@ -1977,6 +1977,7 @@ typedef struct SVCreateTbReq {
union
{
union
{
struct
{
struct
{
char
*
name
;
// super table name
char
*
name
;
// super table name
uint8_t
tagNum
;
tb_uid_t
suid
;
tb_uid_t
suid
;
SArray
*
tagName
;
SArray
*
tagName
;
uint8_t
*
pTag
;
uint8_t
*
pTag
;
...
...
source/client/src/clientEnv.c
浏览文件 @
cf665cdb
...
@@ -244,6 +244,7 @@ void *createRequest(uint64_t connId, int32_t type) {
...
@@ -244,6 +244,7 @@ void *createRequest(uint64_t connId, int32_t type) {
STscObj
*
pTscObj
=
acquireTscObj
(
connId
);
STscObj
*
pTscObj
=
acquireTscObj
(
connId
);
if
(
pTscObj
==
NULL
)
{
if
(
pTscObj
==
NULL
)
{
taosMemoryFree
(
pRequest
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
NULL
;
return
NULL
;
}
}
...
...
source/client/src/clientImpl.c
浏览文件 @
cf665cdb
...
@@ -834,7 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
...
@@ -834,7 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
tstrerror
(
code
),
pRequest
->
requestId
);
tstrerror
(
code
),
pRequest
->
requestId
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
)
&&
pRequest
->
sqlstr
!=
NULL
)
{
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
pRequest
->
prevCode
=
code
;
pRequest
->
prevCode
=
code
;
...
...
source/client/src/clientSml.c
浏览文件 @
cf665cdb
...
@@ -1489,7 +1489,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr
...
@@ -1489,7 +1489,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr
}
}
info
->
id
=
smlGenId
();
info
->
id
=
smlGenId
();
info
->
pQuery
=
(
SQuery
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SQuery
)
);
info
->
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
info
->
pQuery
)
{
if
(
NULL
==
info
->
pQuery
)
{
uError
(
"SML:0x%"
PRIx64
" create info->pQuery error"
,
info
->
id
);
uError
(
"SML:0x%"
PRIx64
" create info->pQuery error"
,
info
->
id
);
goto
cleanup
;
goto
cleanup
;
...
...
source/client/src/tmq.c
浏览文件 @
cf665cdb
...
@@ -2128,7 +2128,7 @@ _err:
...
@@ -2128,7 +2128,7 @@ _err:
return
string
;
return
string
;
}
}
static
char
*
buildCreateCTableJson
(
STag
*
pTag
,
char
*
sname
,
char
*
name
,
SArray
*
tagName
,
int64_t
id
)
{
static
char
*
buildCreateCTableJson
(
STag
*
pTag
,
char
*
sname
,
char
*
name
,
SArray
*
tagName
,
int64_t
id
,
uint8_t
tagNum
)
{
char
*
string
=
NULL
;
char
*
string
=
NULL
;
SArray
*
pTagVals
=
NULL
;
SArray
*
pTagVals
=
NULL
;
cJSON
*
json
=
cJSON_CreateObject
();
cJSON
*
json
=
cJSON_CreateObject
();
...
@@ -2148,6 +2148,8 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
...
@@ -2148,6 +2148,8 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
cJSON_AddItemToObject
(
json
,
"tableType"
,
tableType
);
cJSON_AddItemToObject
(
json
,
"tableType"
,
tableType
);
cJSON
*
using
=
cJSON_CreateString
(
sname
);
cJSON
*
using
=
cJSON_CreateString
(
sname
);
cJSON_AddItemToObject
(
json
,
"using"
,
using
);
cJSON_AddItemToObject
(
json
,
"using"
,
using
);
cJSON
*
tagNumJson
=
cJSON_CreateNumber
(
tagNum
);
cJSON_AddItemToObject
(
json
,
"tagNum"
,
tagNumJson
);
// cJSON* version = cJSON_CreateNumber(1);
// cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version);
// cJSON_AddItemToObject(json, "version", version);
...
@@ -2236,7 +2238,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
...
@@ -2236,7 +2238,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
pCreateReq
=
req
.
pReqs
+
iReq
;
pCreateReq
=
req
.
pReqs
+
iReq
;
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
)
{
string
=
buildCreateCTableJson
((
STag
*
)
pCreateReq
->
ctb
.
pTag
,
pCreateReq
->
ctb
.
name
,
pCreateReq
->
name
,
string
=
buildCreateCTableJson
((
STag
*
)
pCreateReq
->
ctb
.
pTag
,
pCreateReq
->
ctb
.
name
,
pCreateReq
->
name
,
pCreateReq
->
ctb
.
tagName
,
pCreateReq
->
uid
);
pCreateReq
->
ctb
.
tagName
,
pCreateReq
->
uid
,
pCreateReq
->
ctb
.
tagNum
);
}
else
if
(
pCreateReq
->
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
pCreateReq
->
type
==
TSDB_NORMAL_TABLE
)
{
string
=
string
=
buildCreateTableJson
(
&
pCreateReq
->
ntb
.
schemaRow
,
NULL
,
pCreateReq
->
name
,
pCreateReq
->
uid
,
TSDB_NORMAL_TABLE
);
buildCreateTableJson
(
&
pCreateReq
->
ntb
.
schemaRow
,
NULL
,
pCreateReq
->
name
,
pCreateReq
->
uid
,
TSDB_NORMAL_TABLE
);
...
@@ -2952,9 +2954,243 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
...
@@ -2952,9 +2954,243 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
void
tmq_free_raw_meta
(
tmq_raw_data
*
rawMeta
)
{
typedef
struct
{
//
SVgroupInfo
vg
;
taosMemoryFreeClear
(
rawMeta
);
void
*
data
;
}
VgData
;
static
void
destroyVgHash
(
void
*
data
)
{
VgData
*
vgData
=
(
VgData
*
)
data
;
taosMemoryFreeClear
(
vgData
->
data
);
}
int32_t
taos_write_raw_data
(
TAOS
*
taos
,
TAOS_RES
*
msg
){
if
(
!
TD_RES_TMQ
(
msg
))
{
uError
(
"WriteRaw:msg is not tmq : %d"
,
*
(
int8_t
*
)
msg
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
pVgHash
=
NULL
;
SQuery
*
pQuery
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
){
uError
(
"WriteRaw:createRequest error request is null"
);
return
terrno
;
}
if
(
!
pRequest
->
pDb
)
{
uError
(
"WriteRaw:not use db"
);
code
=
TSDB_CODE_PAR_DB_NOT_SPECIFIED
;
goto
end
;
}
pVgHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
taosHashSetFreeFp
(
pVgHash
,
destroyVgHash
);
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
uError
(
"WriteRaw: get gatlog error"
);
goto
end
;
}
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
conn
.
requestId
=
pRequest
->
requestId
;
conn
.
requestObjRefId
=
pRequest
->
self
;
conn
.
mgmtEps
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SMqRspObj
*
rspObj
=
((
SMqRspObj
*
)
msg
);
printf
(
"raw data block num:%d
\n
"
,
rspObj
->
rsp
.
blockNum
);
while
(
++
rspObj
->
resIter
<
rspObj
->
rsp
.
blockNum
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
rspObj
->
rsp
.
blockData
,
rspObj
->
resIter
);
if
(
!
rspObj
->
rsp
.
withSchema
)
{
uError
(
"WriteRaw:no schema, iter:%d"
,
rspObj
->
resIter
);
goto
end
;
}
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
rspObj
->
rsp
.
blockSchema
,
rspObj
->
resIter
);
setResSchemaInfo
(
&
rspObj
->
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
code
=
setQueryResultFromRsp
(
&
rspObj
->
resInfo
,
pRetrieve
,
false
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
uError
(
"WriteRaw: setQueryResultFromRsp error"
);
goto
end
;
}
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
SSchema
*
schema
=
pSW
->
pSchema
+
i
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
)){
nVar
++
;
}
}
int32_t
rows
=
rspObj
->
resInfo
.
numOfRows
;
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
pSW
->
nCols
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
if
(
!
tbName
){
uError
(
"WriteRaw: tbname is null"
);
code
=
TSDB_CODE_TMQ_INVALID_MSG
;
goto
end
;
}
printf
(
"raw data tbname:%s
\n
"
,
tbName
);
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbName
);
VgData
vgData
=
{
0
};
code
=
catalogGetTableHashVgroup
(
pCatalog
,
&
conn
,
&
pName
,
&
(
vgData
.
vg
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableHashVgroup failed. table name: %s"
,
tbName
);
goto
end
;
}
SSubmitReq
*
subReq
=
NULL
;
SSubmitBlk
*
blk
=
NULL
;
void
*
hData
=
taosHashGet
(
pVgHash
,
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
));
if
(
hData
){
vgData
=
*
(
VgData
*
)
hData
;
int32_t
totalLen
=
((
SSubmitReq
*
)(
vgData
.
data
))
->
length
+
submitLen
;
void
*
tmp
=
taosMemoryRealloc
(
vgData
.
data
,
totalLen
);
if
(
tmp
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
vgData
.
data
=
tmp
;
((
VgData
*
)
hData
)
->
data
=
tmp
;
subReq
=
(
SSubmitReq
*
)(
vgData
.
data
);
blk
=
POINTER_SHIFT
(
vgData
.
data
,
subReq
->
length
);
}
else
{
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
void
*
tmp
=
taosMemoryCalloc
(
1
,
totalLen
);
if
(
tmp
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
vgData
.
data
=
tmp
;
taosHashPut
(
pVgHash
,
(
const
char
*
)
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
),
(
char
*
)
&
vgData
,
sizeof
(
vgData
));
subReq
=
(
SSubmitReq
*
)(
vgData
.
data
);
subReq
->
length
=
sizeof
(
SSubmitReq
);
subReq
->
numOfBlocks
=
0
;
blk
=
POINTER_SHIFT
(
vgData
.
data
,
sizeof
(
SSubmitReq
));
}
STableMeta
*
pTableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableMeta failed. table name: %s"
,
tbName
);
goto
end
;
}
uint64_t
suid
=
(
TSDB_NORMAL_TABLE
==
pTableMeta
->
tableType
?
0
:
pTableMeta
->
suid
);
uint64_t
uid
=
pTableMeta
->
uid
;
taosMemoryFreeClear
(
pTableMeta
);
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pSW
->
version
);
tdSRowSetTpInfo
(
&
rb
,
pSW
->
nCols
,
fLen
);
int32_t
dataLen
=
0
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
tdSRowResetBuf
(
&
rb
,
rowData
);
doSetOneRowPtr
(
&
rspObj
->
resInfo
);
rspObj
->
resInfo
.
current
+=
1
;
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
pSW
->
nCols
;
k
++
)
{
const
SSchema
*
pColumn
=
&
pSW
->
pSchema
[
k
];
char
*
data
=
rspObj
->
resInfo
.
row
[
k
];
if
(
!
data
)
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
)){
data
-=
VARSTR_HEADER_SIZE
;
}
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
offset
+=
TYPE_BYTES
[
pColumn
->
type
];
}
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
dataLen
+=
rowLen
;
}
blk
->
uid
=
htobe64
(
uid
);
blk
->
suid
=
htobe64
(
suid
);
blk
->
padding
=
htonl
(
blk
->
padding
);
blk
->
sversion
=
htonl
(
pSW
->
version
);
blk
->
schemaLen
=
htonl
(
schemaLen
);
blk
->
numOfRows
=
htons
(
rows
);
blk
->
dataLen
=
htonl
(
dataLen
);
subReq
->
length
+=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
subReq
->
numOfBlocks
++
;
}
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
pQuery
->
pRoot
)
{
uError
(
"create pQuery->pRoot error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
nodeStmt
->
payloadType
=
PAYLOAD_TYPE_KV
;
int32_t
numOfVg
=
taosHashGetSize
(
pVgHash
);
nodeStmt
->
pDataBlocks
=
taosArrayInit
(
numOfVg
,
POINTER_BYTES
);
VgData
*
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
NULL
);
while
(
vData
)
{
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
dst
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
dst
->
vg
=
vData
->
vg
;
SSubmitReq
*
subReq
=
(
SSubmitReq
*
)(
vData
->
data
);
dst
->
numOfTables
=
subReq
->
numOfBlocks
;
dst
->
size
=
subReq
->
length
;
dst
->
pData
=
(
char
*
)
subReq
;
vData
->
data
=
NULL
;
// no need free
subReq
->
header
.
vgId
=
htonl
(
dst
->
vg
.
vgId
);
subReq
->
version
=
htonl
(
1
);
subReq
->
header
.
contLen
=
htonl
(
subReq
->
length
);
subReq
->
length
=
htonl
(
subReq
->
length
);
subReq
->
numOfBlocks
=
htonl
(
subReq
->
numOfBlocks
);
taosArrayPush
(
nodeStmt
->
pDataBlocks
,
&
dst
);
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
vData
);
}
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
qDestroyQuery
(
pQuery
);
destroyRequest
(
pRequest
);
taosHashCleanup
(
pVgHash
);
return
code
;
}
}
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
cf665cdb
...
@@ -4981,6 +4981,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
...
@@ -4981,6 +4981,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tEncodeCStr
(
pCoder
,
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tEncodeU8
(
pCoder
,
pReq
->
ctb
.
tagNum
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
int32_t
len
=
taosArrayGetSize
(
pReq
->
ctb
.
tagName
);
int32_t
len
=
taosArrayGetSize
(
pReq
->
ctb
.
tagName
);
...
@@ -5017,6 +5018,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
...
@@ -5017,6 +5018,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tDecodeCStr
(
pCoder
,
&
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pCoder
,
&
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeU8
(
pCoder
,
&
pReq
->
ctb
.
tagNum
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeTag
(
pCoder
,
(
STag
**
)
&
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
if
(
tDecodeTag
(
pCoder
,
(
STag
**
)
&
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
int32_t
len
=
0
;
int32_t
len
=
0
;
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
cf665cdb
...
@@ -67,10 +67,10 @@ int32_t tqMetaOpen(STQ* pTq) {
...
@@ -67,10 +67,10 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT
(
0
);
ASSERT
(
0
);
}
}
void
*
pKey
;
void
*
pKey
=
NULL
;
int
kLen
;
int
kLen
=
0
;
void
*
pVal
;
void
*
pVal
=
NULL
;
int
vLen
;
int
vLen
=
0
;
tdbTbcMoveToFirst
(
pCur
);
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
SDecoder
decoder
;
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
cf665cdb
...
@@ -739,12 +739,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
...
@@ -739,12 +739,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
,
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
,
SArray
*
tagName
,
uint8_t
tagNum
)
{
SArray
*
tagName
)
{
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
ctb
.
suid
=
suid
;
pTbReq
->
ctb
.
suid
=
suid
;
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
pTbReq
->
ctb
.
tagNum
=
tagNum
;
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
ctb
.
tagName
=
taosArrayDup
(
tagName
);
pTbReq
->
ctb
.
tagName
=
taosArrayDup
(
tagName
);
pTbReq
->
commentLen
=
-
1
;
pTbReq
->
commentLen
=
-
1
;
...
@@ -903,7 +903,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
...
@@ -903,7 +903,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
if
(
pToken
->
n
+
VARSTR_HEADER_SIZE
>
pSchema
->
bytes
)
{
if
(
pToken
->
n
+
VARSTR_HEADER_SIZE
>
pSchema
->
bytes
)
{
return
generateSyntaxErrMsg
(
pMsgBuf
,
TSDB_CODE_PAR_VALUE_TOO_LONG
,
pSchema
->
name
);
return
generateSyntaxErrMsg
(
pMsgBuf
,
TSDB_CODE_PAR_VALUE_TOO_LONG
,
pSchema
->
name
);
}
}
val
->
pData
=
pToken
->
z
;
val
->
pData
=
strdup
(
pToken
->
z
)
;
val
->
nData
=
pToken
->
n
;
val
->
nData
=
pToken
->
n
;
break
;
break
;
}
}
...
@@ -969,10 +969,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -969,10 +969,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
}
}
SSchema
*
pTagSchema
=
&
pSchema
[
pCxt
->
tags
.
boundColumns
[
i
]];
SSchema
*
pTagSchema
=
&
pSchema
[
pCxt
->
tags
.
boundColumns
[
i
]];
char
*
tmpTokenBuf
=
taosMemoryCalloc
(
1
,
sToken
.
n
);
// todo this can be optimize with parse column
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// todo this can be optimize with parse column
code
=
checkAndTrimValue
(
&
sToken
,
tmpTokenBuf
,
&
pCxt
->
msg
);
code
=
checkAndTrimValue
(
&
sToken
,
tmpTokenBuf
,
&
pCxt
->
msg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
tmpTokenBuf
);
goto
end
;
goto
end
;
}
}
...
@@ -982,7 +981,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -982,7 +981,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
sToken
.
n
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
if
(
sToken
.
n
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
code
=
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"json string too long than 4095"
,
sToken
.
z
);
code
=
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"json string too long than 4095"
,
sToken
.
z
);
taosMemoryFree
(
tmpTokenBuf
);
goto
end
;
goto
end
;
}
}
if
(
isNullValue
(
pTagSchema
->
type
,
&
sToken
))
{
if
(
isNullValue
(
pTagSchema
->
type
,
&
sToken
))
{
...
@@ -990,7 +988,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -990,7 +988,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
}
else
{
}
else
{
code
=
parseJsontoTagData
(
sToken
.
z
,
pTagVals
,
&
pTag
,
&
pCxt
->
msg
);
code
=
parseJsontoTagData
(
sToken
.
z
,
pTagVals
,
&
pTag
,
&
pCxt
->
msg
);
}
}
taosMemoryFree
(
tmpTokenBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
goto
end
;
}
}
...
@@ -999,12 +996,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -999,12 +996,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
STagVal
val
=
{
0
};
STagVal
val
=
{
0
};
code
=
parseTagToken
(
&
pCxt
->
pSql
,
&
sToken
,
pTagSchema
,
precision
,
&
val
,
&
pCxt
->
msg
);
code
=
parseTagToken
(
&
pCxt
->
pSql
,
&
sToken
,
pTagSchema
,
precision
,
&
val
,
&
pCxt
->
msg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
taosMemoryFree
(
tmpTokenBuf
);
goto
end
;
goto
end
;
}
}
if
(
pTagSchema
->
type
!=
TSDB_DATA_TYPE_BINARY
)
{
taosMemoryFree
(
tmpTokenBuf
);
}
taosArrayPush
(
pTagVals
,
&
val
);
taosArrayPush
(
pTagVals
,
&
val
);
}
}
}
}
...
@@ -1018,12 +1012,12 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -1018,12 +1012,12 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
goto
end
;
goto
end
;
}
}
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
,
tagName
);
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
,
tagName
,
pCxt
->
pTableMeta
->
tableInfo
.
numOfTags
);
end:
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
STagVal
*
p
=
(
STagVal
*
)
taosArrayGet
(
pTagVals
,
i
);
STagVal
*
p
=
(
STagVal
*
)
taosArrayGet
(
pTagVals
,
i
);
if
(
p
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
IS_VAR_DATA_TYPE
(
p
->
type
)
)
{
taosMemoryFree
(
p
->
pData
);
taosMemoryFree
(
p
->
pData
);
}
}
}
}
...
@@ -1902,7 +1896,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
...
@@ -1902,7 +1896,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
}
}
SVCreateTbReq
tbReq
=
{
0
};
SVCreateTbReq
tbReq
=
{
0
};
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
,
sTableName
,
tagName
);
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
,
sTableName
,
tagName
,
pDataBlock
->
pTableMeta
->
tableInfo
.
numOfTags
);
code
=
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
);
code
=
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
);
tdDestroySVCreateTbReq
(
&
tbReq
);
tdDestroySVCreateTbReq
(
&
tbReq
);
...
@@ -2338,7 +2332,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
...
@@ -2338,7 +2332,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
return
ret
;
return
ret
;
}
}
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
);
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
,
pTableMeta
->
tableInfo
.
numOfTags
);
taosArrayDestroy
(
tagName
);
taosArrayDestroy
(
tagName
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
cf665cdb
...
@@ -5559,7 +5559,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
...
@@ -5559,7 +5559,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
static
void
addCreateTbReqIntoVgroup
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
,
SCreateSubTableClause
*
pStmt
,
static
void
addCreateTbReqIntoVgroup
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
,
SCreateSubTableClause
*
pStmt
,
const
STag
*
pTag
,
uint64_t
suid
,
const
char
*
sTableNmae
,
SVgroupInfo
*
pVgInfo
,
const
STag
*
pTag
,
uint64_t
suid
,
const
char
*
sTableNmae
,
SVgroupInfo
*
pVgInfo
,
SArray
*
tagName
)
{
SArray
*
tagName
,
uint8_t
tagNum
)
{
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// strcpy(name.dbname, pStmt->dbName);
// strcpy(name.dbname, pStmt->dbName);
...
@@ -5576,6 +5576,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
...
@@ -5576,6 +5576,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req
.
commentLen
=
-
1
;
req
.
commentLen
=
-
1
;
}
}
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
tagNum
=
tagNum
;
req
.
ctb
.
name
=
strdup
(
sTableNmae
);
req
.
ctb
.
name
=
strdup
(
sTableNmae
);
req
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
req
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
req
.
ctb
.
tagName
=
taosArrayDup
(
tagName
);
req
.
ctb
.
tagName
=
taosArrayDup
(
tagName
);
...
@@ -5827,7 +5828,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
...
@@ -5827,7 +5828,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
pStmt
->
useTableName
,
&
info
,
tagName
);
pStmt
->
useTableName
,
&
info
,
tagName
,
pSuperTableMeta
->
tableInfo
.
numOfTags
);
}
}
taosArrayDestroy
(
tagName
);
taosArrayDestroy
(
tagName
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录