Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d150bd1d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d150bd1d
编写于
10月 17, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/TD-19221-3.0
上级
b7222bc4
34f576b6
变更
27
隐藏空白更改
内联
并排
Showing
27 changed file
with
401 addition
and
137 deletion
+401
-137
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/common/tmsg.h
include/common/tmsg.h
+1
-1
packaging/tools/make_install.sh
packaging/tools/make_install.sh
+42
-41
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/client/src/clientRawBlockWrite.c
source/client/src/clientRawBlockWrite.c
+13
-7
source/client/src/clientSml.c
source/client/src/clientSml.c
+25
-11
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+3
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+27
-23
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+4
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+1
-1
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+4
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-6
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+206
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-0
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+1
-0
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+4
-3
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+15
-6
source/libs/tdb/src/db/tdbTable.c
source/libs/tdb/src/db/tdbTable.c
+2
-0
source/libs/tdb/src/inc/tdbOs.h
source/libs/tdb/src/inc/tdbOs.h
+3
-1
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+1
-1
tools/shell/src/shellArguments.c
tools/shell/src/shellArguments.c
+1
-1
tools/taosws-rs
tools/taosws-rs
+1
-0
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+4
-4
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+29
-22
utils/test/c/tmq_taosx_ci.c
utils/test/c/tmq_taosx_ci.c
+1
-1
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
d150bd1d
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
2849aa4
GIT_TAG
4d02980
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/common/tmsg.h
浏览文件 @
d150bd1d
...
...
@@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
if
(
pRebInfo
==
NULL
)
{
return
NULL
;
}
strcpy
(
pRebInfo
->
key
,
key
);
tstrncpy
(
pRebInfo
->
key
,
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
pRebInfo
->
lostConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pRebInfo
->
lostConsumers
==
NULL
)
{
goto
_err
;
...
...
packaging/tools/make_install.sh
浏览文件 @
d150bd1d
...
...
@@ -168,7 +168,7 @@ function install_bin() {
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/
${
clientName
}
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/taosBenchmark
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosBenchmark
${
install_main_dir
}
/bin
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
>
/dev/null 2>&1
||
:
[
-f
${
binary_dir
}
/build/bin/taosdump
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosdump
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/taosadapter
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosadapter
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/udfd
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/udfd
${
install_main_dir
}
/bin
||
:
...
...
@@ -182,21 +182,21 @@ function install_bin() {
${
csudo
}
chmod
0555
${
install_main_dir
}
/bin/
*
#Make link
[
-x
${
install_main_dir
}
/bin/
${
clientName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
clientName
}
${
bin_link_dir
}
/
${
clientName
}
||
:
[
-x
${
install_main_dir
}
/bin/
${
serverName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
serverName
}
${
bin_link_dir
}
/
${
serverName
}
||
:
[
-x
${
install_main_dir
}
/bin/taosadapter
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosadapter
${
bin_link_dir
}
/taosadapter
||
:
[
-x
${
install_main_dir
}
/bin/udfd
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/udfd
${
bin_link_dir
}
/udfd
||
:
[
-x
${
install_main_dir
}
/bin/taosdump
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdump
${
bin_link_dir
}
/taosdump
||
:
[
-x
${
install_main_dir
}
/bin/taosdemo
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdemo
${
bin_link_dir
}
/taosdemo
||
:
[
-x
${
install_main_dir
}
/bin/taosx
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosx
${
bin_link_dir
}
/taosx
||
:
[
-x
${
install_main_dir
}
/bin/perfMonitor
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/perfMonitor
${
bin_link_dir
}
/perfMonitor
||
:
[
-x
${
install_main_dir
}
/set_core.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/set_core.sh
${
bin_link_dir
}
/set_core
||
:
[
-x
${
install_main_dir
}
/bin/remove.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/remove.sh
${
bin_link_dir
}
/
${
uninstallScript
}
||
:
[
-x
${
install_main_dir
}
/bin/
${
clientName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
clientName
}
${
bin_link_dir
}
/
${
clientName
}
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/
${
serverName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
serverName
}
${
bin_link_dir
}
/
${
serverName
}
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosadapter
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosadapter
${
bin_link_dir
}
/taosadapter
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/udfd
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/udfd
${
bin_link_dir
}
/udfd
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosdump
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdump
${
bin_link_dir
}
/taosdump
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosdemo
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdemo
${
bin_link_dir
}
/taosdemo
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosx
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosx
${
bin_link_dir
}
/taosx
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/perfMonitor
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/perfMonitor
${
bin_link_dir
}
/perfMonitor
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/set_core.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/set_core.sh
${
bin_link_dir
}
/set_core
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/remove.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/remove.sh
${
bin_link_dir
}
/
${
uninstallScript
}
>
/dev/null 2>&1
||
:
else
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/
${
clientName
}
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/taosBenchmark
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosBenchmark
${
install_main_dir
}
/bin
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
>
/dev/null 2>&1
||
:
[
-f
${
binary_dir
}
/build/bin/taosdump
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosdump
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/taosadapter
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/taosadapter
${
install_main_dir
}
/bin
||
:
[
-f
${
binary_dir
}
/build/bin/udfd
]
&&
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/udfd
${
install_main_dir
}
/bin
||
:
...
...
@@ -206,14 +206,14 @@ function install_bin() {
${
csudo
}
cp
-r
${
script_dir
}
/remove.sh
${
install_main_dir
}
/bin
||
:
${
csudo
}
chmod
0555
${
install_main_dir
}
/bin/
*
#Make link
[
-x
${
install_main_dir
}
/bin/
${
clientName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
clientName
}
${
bin_link_dir
}
/
${
clientName
}
||
:
[
-x
${
install_main_dir
}
/bin/
${
serverName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
serverName
}
${
bin_link_dir
}
/
${
serverName
}
||
:
[
-x
${
install_main_dir
}
/bin/taosadapter
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosadapter
${
bin_link_dir
}
/taosadapter
||
:
[
-x
${
install_main_dir
}
/bin/udfd
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/udfd
${
bin_link_dir
}
/udfd
||
:
[
-x
${
install_main_dir
}
/bin/taosdump
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdump
${
bin_link_dir
}
/taosdump
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
||
:
[
-x
${
install_main_dir
}
/bin/taosx
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosx
${
bin_link_dir
}
/taosx
||
:
[
-x
${
install_main_dir
}
/bin/remove.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/remove.sh
${
bin_link_dir
}
/
${
uninstallScript
}
||
:
[
-x
${
install_main_dir
}
/bin/
${
clientName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
clientName
}
${
bin_link_dir
}
/
${
clientName
}
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/
${
serverName
}
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/
${
serverName
}
${
bin_link_dir
}
/
${
serverName
}
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosadapter
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosadapter
${
bin_link_dir
}
/taosadapter
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/udfd
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/udfd
${
bin_link_dir
}
/udfd
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosdump
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosdump
${
bin_link_dir
}
/taosdump
>
/dev/null 2>&1
||
:
[
-f
${
install_main_dir
}
/bin/taosBenchmark
]
&&
${
csudo
}
ln
-sf
${
install_main_dir
}
/bin/taosBenchmark
${
install_main_dir
}
/bin/taosdemo
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/taosx
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/taosx
${
bin_link_dir
}
/taosx
>
/dev/null 2>&1
||
:
[
-x
${
install_main_dir
}
/bin/remove.sh
]
&&
${
csudo
}
ln
-s
${
install_main_dir
}
/bin/remove.sh
${
bin_link_dir
}
/
${
uninstallScript
}
>
/dev/null 2>&1
||
:
fi
}
...
...
@@ -238,7 +238,7 @@ function install_jemalloc() {
if
[
-f
"
${
binary_dir
}
/build/lib/libjemalloc.so.2"
]
;
then
${
csudo
}
/usr/bin/install
-c
-d
/usr/local/lib
${
csudo
}
/usr/bin/install
-c
-m
755
${
binary_dir
}
/build/lib/libjemalloc.so.2 /usr/local/lib
${
csudo
}
ln
-sf
libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${
csudo
}
ln
-sf
libjemalloc.so.2 /usr/local/lib/libjemalloc.so
>
/dev/null 2>&1
${
csudo
}
/usr/bin/install
-c
-d
/usr/local/lib
[
-f
${
binary_dir
}
/build/lib/libjemalloc.a
]
&&
${
csudo
}
/usr/bin/install
-c
-m
755
${
binary_dir
}
/build/lib/libjemalloc.a /usr/local/lib
...
...
@@ -274,8 +274,8 @@ function install_avro() {
if
[
-f
"
${
binary_dir
}
/build/
$1
/libavro.so.23.0.0"
]
&&
[
-d
/usr/local/
$1
]
;
then
${
csudo
}
/usr/bin/install
-c
-d
/usr/local/
$1
${
csudo
}
/usr/bin/install
-c
-m
755
${
binary_dir
}
/build/
$1
/libavro.so.23.0.0 /usr/local/
$1
${
csudo
}
ln
-sf
libavro.so.23.0.0 /usr/local/
$1
/libavro.so.23
${
csudo
}
ln
-sf
libavro.so.23 /usr/local/
$1
/libavro.so
${
csudo
}
ln
-sf
libavro.so.23.0.0 /usr/local/
$1
/libavro.so.23
>
/dev/null 2>&1
${
csudo
}
ln
-sf
libavro.so.23 /usr/local/
$1
/libavro.so
>
/dev/null 2>&1
${
csudo
}
/usr/bin/install
-c
-d
/usr/local/
$1
[
-f
${
binary_dir
}
/build/
$1
/libavro.a
]
&&
${
csudo
}
/usr/bin/install
-c
-m
755
${
binary_dir
}
/build/
$1
/libavro.a /usr/local/
$1
...
...
@@ -304,11 +304,11 @@ function install_lib() {
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/libtaos.so.
${
verNumber
}
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
*
${
lib_link_dir
}
/libtaos.so.1
${
csudo
}
ln
-sf
${
lib_link_dir
}
/libtaos.so.1
${
lib_link_dir
}
/libtaos.so
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
*
${
lib_link_dir
}
/libtaos.so.1
>
/dev/null 2>&1
${
csudo
}
ln
-sf
${
lib_link_dir
}
/libtaos.so.1
${
lib_link_dir
}
/libtaos.so
>
/dev/null 2>&1
if
[
-d
"
${
lib64_link_dir
}
"
]
;
then
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
*
${
lib64_link_dir
}
/libtaos.so.1
${
csudo
}
ln
-sf
${
lib64_link_dir
}
/libtaos.so.1
${
lib64_link_dir
}
/libtaos.so
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
*
${
lib64_link_dir
}
/libtaos.so.1
>
/dev/null 2>&1
${
csudo
}
ln
-sf
${
lib64_link_dir
}
/libtaos.so.1
${
lib64_link_dir
}
/libtaos.so
>
/dev/null 2>&1
fi
if
[
-f
${
binary_dir
}
/build/lib/libtaosws.so
]
;
then
...
...
@@ -316,23 +316,23 @@ function install_lib() {
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/libtaosws.so
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaosws.so
${
lib_link_dir
}
/libtaosws.so
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaosws.so
${
lib_link_dir
}
/libtaosws.so
>
/dev/null 2>&1
||
:
fi
else
${
csudo
}
cp
-Rf
${
binary_dir
}
/build/lib/libtaos.
${
verNumber
}
.dylib
\
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/
*
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
${
verNumber
}
.dylib
\
${
lib_link_dir
}
/libtaos.1.dylib
||
:
${
lib_link_dir
}
/libtaos.1.dylib
>
/dev/null 2>&1
||
:
${
csudo
}
ln
-sf
${
lib_link_dir
}
/libtaos.1.dylib
${
lib_link_dir
}
/libtaos.dylib
||
:
${
csudo
}
ln
-sf
${
lib_link_dir
}
/libtaos.1.dylib
${
lib_link_dir
}
/libtaos.dylib
>
/dev/null 2>&1
||
:
if
[
-f
${
binary_dir
}
/build/lib/libtaosws.dylib
]
;
then
${
csudo
}
cp
${
binary_dir
}
/build/lib/libtaosws.dylib
\
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/libtaosws.dylib
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaosws.dylib
${
lib_link_dir
}
/libtaosws.dylib
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaosws.dylib
${
lib_link_dir
}
/libtaosws.dylib
>
/dev/null 2>&1
||
:
fi
fi
...
...
@@ -346,6 +346,7 @@ function install_lib() {
}
function
install_header
()
{
${
csudo
}
mkdir
-p
${
inc_link_dir
}
${
csudo
}
rm
-f
${
inc_link_dir
}
/taos.h
${
inc_link_dir
}
/taosdef.h
${
inc_link_dir
}
/taoserror.h
${
inc_link_dir
}
/taosudf.h
||
:
[
-f
${
inc_link_dir
}
/taosws.h
]
&&
${
csudo
}
rm
-f
${
inc_link_dir
}
/taosws.h
||
:
${
csudo
}
cp
-f
${
source_dir
}
/include/client/taos.h
${
source_dir
}
/include/common/taosdef.h
${
source_dir
}
/include/util/taoserror.h
${
source_dir
}
/include/libs/function/taosudf.h
\
...
...
@@ -353,13 +354,13 @@ function install_header() {
if
[
-f
${
binary_dir
}
/build/include/taosws.h
]
;
then
${
csudo
}
cp
-f
${
binary_dir
}
/build/include/taosws.h
${
install_main_dir
}
/include
&&
${
csudo
}
chmod
644
${
install_main_dir
}
/include/taosws.h
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/include/taosws.h
${
inc_link_dir
}
/taosws.h
||
:
${
csudo
}
ln
-sf
${
install_main_dir
}
/include/taosws.h
${
inc_link_dir
}
/taosws.h
>
/dev/null 2>&1
||
:
fi
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taos.h
${
inc_link_dir
}
/taos.h
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taosdef.h
${
inc_link_dir
}
/taosdef.h
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taoserror.h
${
inc_link_dir
}
/taoserror.h
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taosudf.h
${
inc_link_dir
}
/taosudf.h
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taos.h
${
inc_link_dir
}
/taos.h
>
/dev/null 2>&1
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taosdef.h
${
inc_link_dir
}
/taosdef.h
>
/dev/null 2>&1
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taoserror.h
${
inc_link_dir
}
/taoserror.h
>
/dev/null 2>&1
${
csudo
}
ln
-s
${
install_main_dir
}
/include/taosudf.h
${
inc_link_dir
}
/taosudf.h
>
/dev/null 2>&1
${
csudo
}
chmod
644
${
install_main_dir
}
/include/
*
}
...
...
@@ -374,7 +375,7 @@ function install_config() {
${
csudo
}
cp
-f
${
script_dir
}
/../cfg/
${
configFile
}
\
${
cfg_install_dir
}
/
${
configFile
}
.
${
verNumber
}
${
csudo
}
ln
-s
${
cfg_install_dir
}
/
${
configFile
}
\
${
install_main_dir
}
/cfg/
${
configFile
}
${
install_main_dir
}
/cfg/
${
configFile
}
>
/dev/null 2>&1
else
${
csudo
}
cp
-f
${
script_dir
}
/../cfg/
${
configFile
}
\
${
cfg_install_dir
}
/
${
configFile
}
.
${
verNumber
}
...
...
@@ -395,7 +396,7 @@ function install_taosadapter_config() {
${
cfg_install_dir
}
/taosadapter.toml.
${
verNumber
}
||
:
[
-f
${
cfg_install_dir
}
/taosadapter.toml
]
&&
${
csudo
}
ln
-s
${
cfg_install_dir
}
/taosadapter.toml
\
${
install_main_dir
}
/cfg/taosadapter.toml
||
:
${
install_main_dir
}
/cfg/taosadapter.toml
>
/dev/null 2>&1
||
:
else
if
[
-f
"
${
binary_dir
}
/test/cfg/taosadapter.toml"
]
;
then
${
csudo
}
cp
-f
${
binary_dir
}
/test/cfg/taosadapter.toml
\
...
...
@@ -408,12 +409,12 @@ function install_taosadapter_config() {
function
install_log
()
{
${
csudo
}
rm
-rf
${
log_dir
}
||
:
${
csudo
}
mkdir
-p
${
log_dir
}
&&
${
csudo
}
chmod
777
${
log_dir
}
${
csudo
}
ln
-s
${
log_dir
}
${
install_main_dir
}
/log
${
csudo
}
ln
-s
${
log_dir
}
${
install_main_dir
}
/log
>
/dev/null 2>&1
}
function
install_data
()
{
${
csudo
}
mkdir
-p
${
data_dir
}
&&
${
csudo
}
chmod
777
${
data_dir
}
${
csudo
}
ln
-s
${
data_dir
}
${
install_main_dir
}
/data
${
csudo
}
ln
-s
${
data_dir
}
${
install_main_dir
}
/data
>
/dev/null 2>&1
}
function
install_connector
()
{
...
...
@@ -533,7 +534,7 @@ function install_taosadapter_service() {
function
install_service_on_launchctl
()
{
${
csudouser
}
launchctl unload
-w
/Library/LaunchDaemons/com.taosdata.taosd.plist
>
/dev/null 2>&1
||
:
${
csudo
}
cp
${
script_dir
}
/com.taosdata.taosd.plist /Library/LaunchDaemons/com.taosdata.taosd.plist
${
csudouser
}
launchctl load
-w
/Library/LaunchDaemons/com.taosdata.taosd.plist
||
:
${
csudouser
}
launchctl load
-w
/Library/LaunchDaemons/com.taosdata.taosd.plist
>
/dev/null 2>&1
||
:
}
function
install_service
()
{
...
...
source/client/src/clientHb.c
浏览文件 @
d150bd1d
...
...
@@ -173,7 +173,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj
->
pAppInfo
->
totalDnodes
=
pRsp
->
query
->
totalDnodes
;
pTscObj
->
pAppInfo
->
onlineDnodes
=
pRsp
->
query
->
onlineDnodes
;
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
tscTrace
(
"conn %
p
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
tscTrace
(
"conn %
u
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
pTscObj
->
pAppInfo
->
totalDnodes
);
if
(
pRsp
->
query
->
killRid
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
d150bd1d
...
...
@@ -186,7 +186,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
STscObj
*
pTscObj
=
(
*
pRequest
)
->
pTscObj
;
if
(
taosHashPut
(
pTscObj
->
pRequests
,
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
),
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
)))
{
tscError
(
"%
d failed to add to request container, reqId:0x%"
PRIx64
", conn:%d
, %s"
,
(
*
pRequest
)
->
self
,
tscError
(
"%
"
PRIx64
" failed to add to request container, reqId:0x%"
PRIu64
", conn:%"
PRIx64
"
, %s"
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
requestId
,
pTscObj
->
id
,
sql
);
taosMemoryFree
(
param
);
...
...
@@ -371,7 +371,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
pInfo
->
pQnodeList
=
taosArrayDup
(
pNodeList
);
taosArraySort
(
pInfo
->
pQnodeList
,
compareQueryNodeLoad
);
tscDebug
(
"QnodeList updated in cluster 0x%"
PRIx64
", num:%d"
,
pInfo
->
clusterId
,
taosArrayGetSize
(
pInfo
->
pQnodeList
));
(
int
)
taosArrayGetSize
(
pInfo
->
pQnodeList
));
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
...
...
source/client/src/clientRawBlockWrite.c
浏览文件 @
d150bd1d
...
...
@@ -410,6 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVAlterTbReq
vAlterTbReq
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -419,7 +420,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -524,6 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropStbReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -533,7 +535,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -556,6 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropTbBatchReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -565,7 +568,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -684,7 +687,7 @@ end:
static
int32_t
taosDropStb
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVDropStbReq
req
=
{
0
};
SDecoder
coder
;
SDecoder
coder
=
{
0
}
;
SMDropStbReq
pReq
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
...
...
@@ -1212,6 +1215,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SSubmitReq
*
subReq
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
)
{
...
...
@@ -1228,8 +1232,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbname
);
tstrncpy
(
pName
.
dbname
,
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
tstrncpy
(
pName
.
tname
,
tbname
,
sizeof
(
pName
.
tname
)
);
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
...
...
@@ -1278,7 +1282,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
SSubmitReq
*
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
...
...
@@ -1352,6 +1356,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
subReq
);
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
...
...
@@ -1390,6 +1395,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
taosMemoryFree
(
subReq
);
return
code
;
}
...
...
source/client/src/clientSml.c
浏览文件 @
d150bd1d
...
...
@@ -299,6 +299,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
for
(;
i
<
taosArrayGetSize
(
cols
);
i
++
)
{
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosArrayGetP
(
cols
,
i
);
if
(
taosHashGet
(
hashTmp
,
kv
->
key
,
kv
->
keyLen
)
==
NULL
)
{
taosHashCleanup
(
hashTmp
);
return
-
1
;
}
}
...
...
@@ -430,7 +431,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
STableMeta
*
pTableMeta
=
NULL
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
info
->
taos
->
pAppInfo
->
pTransporter
;
...
...
@@ -874,7 +875,8 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
kv
->
i
=
ts
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1010,6 +1012,7 @@ static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const ch
static
int32_t
smlParseTelnetTags
(
const
char
*
data
,
const
char
*
sqlEnd
,
SArray
*
cols
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
const
char
*
sql
=
data
;
size_t
childTableNameLen
=
strlen
(
tsSmlChildTableName
);
while
(
sql
<
sqlEnd
)
{
...
...
@@ -1083,7 +1086,7 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *
kv
->
length
=
valueLen
;
kv
->
type
=
TSDB_DATA_TYPE_NCHAR
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1371,8 +1374,14 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj
*
s2
=
*
(
SHashObj
**
)
key2
;
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
taosHashGet
(
s1
,
TS
,
TS_LEN
);
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
taosHashGet
(
s2
,
TS
,
TS_LEN
);
ASSERT
(
kv1
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
ASSERT
(
kv2
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
if
(
!
kv1
||
kv1
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv1"
);
return
-
1
;
}
if
(
!
kv2
||
kv2
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv2"
);
return
-
1
;
}
if
(
kv1
->
i
<
kv2
->
i
)
{
return
-
1
;
}
else
if
(
kv1
->
i
>
kv2
->
i
)
{
...
...
@@ -1736,7 +1745,7 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
kv
->
i
=
tsVal
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1933,6 +1942,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
}
static
int32_t
smlParseColsFromJSON
(
cJSON
*
root
,
SArray
*
cols
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
cJSON
*
metricVal
=
cJSON_GetObjectItem
(
root
,
"value"
);
if
(
metricVal
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1942,7 +1952,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if
(
!
kv
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
kv
->
key
=
VALUE
;
kv
->
keyLen
=
VALUE_LEN
;
...
...
@@ -1956,7 +1966,9 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
static
int32_t
smlParseTagsFromJSON
(
cJSON
*
root
,
SArray
*
pKVs
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
(
!
pKVs
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
cJSON
*
tags
=
cJSON_GetObjectItem
(
root
,
"tags"
);
if
(
tags
==
NULL
||
tags
->
type
!=
cJSON_Object
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1986,14 +1998,14 @@ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableN
return
TSDB_CODE_TSC_INVALID_JSON
;
}
memset
(
childTableName
,
0
,
TSDB_TABLE_NAME_LEN
);
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
t
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
continue
;
}
// add kv to SSmlKv
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosMemoryCalloc
(
sizeof
(
SSmlKv
),
1
);
if
(
!
kv
)
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
pKVs
)
taosArrayPush
(
pKVs
,
&
kv
);
taosArrayPush
(
pKVs
,
&
kv
);
// key
kv
->
keyLen
=
keyLen
;
...
...
@@ -2104,6 +2116,8 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int l
if
(
!
oneTable
)
{
tinfo
=
smlBuildTableInfo
();
if
(
!
tinfo
)
{
smlDestroyCols
(
cols
);
if
(
info
->
dataFormat
)
taosArrayDestroy
(
cols
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
taosHashPut
(
info
->
childTables
,
elements
.
measure
,
elements
.
measureTagsLen
,
&
tinfo
,
POINTER_BYTES
);
...
...
@@ -2296,7 +2310,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo
*
tableData
=
*
oneTable
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
memcpy
(
pName
.
tname
,
tableData
->
childTableName
,
strlen
(
tableData
->
childTableName
));
SRequestConnInfo
conn
=
{
0
};
...
...
source/client/src/clientStmt.c
浏览文件 @
d150bd1d
...
...
@@ -201,6 +201,9 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
}
STableDataBlocks
**
pSrc
=
taosHashGet
(
pStmt
->
exec
.
pBlockHash
,
pStmt
->
bInfo
.
tbFName
,
strlen
(
pStmt
->
bInfo
.
tbFName
));
if
(
!
pSrc
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
STableDataBlocks
*
pDst
=
NULL
;
STMT_ERR_RET
(
qCloneStmtDataBlock
(
&
pDst
,
*
pSrc
));
...
...
source/client/src/clientTmq.c
浏览文件 @
d150bd1d
...
...
@@ -233,12 +233,12 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
tstrncpy
(
conf
->
groupId
,
value
,
TSDB_CGROUP_LEN
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
tstrncpy
(
conf
->
clientId
,
value
,
256
);
return
TMQ_CONF_OK
;
}
...
...
@@ -452,7 +452,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
int32_t
code
;
tEncodeSize
(
tEncodeSTqOffset
,
pOffset
,
len
,
code
);
if
(
code
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
void
*
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgHead
)
+
len
);
...
...
@@ -464,15 +463,22 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSTqOffset
(
&
encoder
,
pOffset
);
tEncoderClear
(
&
encoder
);
// build param
SMqCommitCbParam
*
pParam
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParam
));
if
(
pParam
==
NULL
)
{
taosMemoryFree
(
buf
);
return
-
1
;
}
pParam
->
params
=
pParamSet
;
pParam
->
pOffset
=
pOffset
;
// build send info
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pMsgSendInfo
==
NULL
)
{
taosMemoryFree
(
buf
);
taosMemoryFree
(
pParam
);
return
-
1
;
}
pMsgSendInfo
->
msgInfo
=
(
SDataBuf
){
...
...
@@ -547,6 +553,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
if
(
pVg
->
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
currentOffset
,
&
pVg
->
committedOffset
))
{
if
(
tmqSendCommitReq
(
tmq
,
pVg
,
pTopic
,
pParamSet
)
<
0
)
{
tsem_destroy
(
&
pParamSet
->
rspSem
);
taosMemoryFree
(
pParamSet
);
goto
FAIL
;
}
goto
HANDLE_RSP
;
...
...
@@ -565,6 +573,7 @@ HANDLE_RSP:
tsem_wait
(
&
pParamSet
->
rspSem
);
code
=
pParamSet
->
rspErr
;
tsem_destroy
(
&
pParamSet
->
rspSem
);
taosMemoryFree
(
pParamSet
);
return
code
;
}
else
{
code
=
0
;
...
...
@@ -587,7 +596,14 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
if
(
pParamSet
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
async
)
{
if
(
automatic
)
{
tmq
->
commitCb
(
tmq
,
code
,
tmq
->
commitCbUserParam
);
}
else
{
userCb
(
tmq
,
code
,
userParam
);
}
}
return
-
1
;
}
...
...
@@ -642,16 +658,6 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
code
=
pParamSet
->
rspErr
;
tsem_destroy
(
&
pParamSet
->
rspSem
);
taosMemoryFree
(
pParamSet
);
}
else
{
code
=
0
;
}
if
(
code
!=
0
&&
async
)
{
if
(
automatic
)
{
tmq
->
commitCb
(
tmq
,
code
,
tmq
->
commitCbUserParam
);
}
else
{
userCb
(
tmq
,
code
,
userParam
);
}
}
#if 0
...
...
@@ -709,6 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
int64_t
refId
=
*
(
int64_t
*
)
param
;
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
refId
);
if
(
tmq
==
NULL
)
{
taosMemoryFree
(
param
);
return
;
}
int64_t
consumerId
=
tmq
->
consumerId
;
...
...
@@ -721,6 +728,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
{
taosMemoryFree
(
pReq
);
goto
OVER
;
}
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
pReq
,
...
...
@@ -870,8 +878,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t
*
pTmq
=
taosMemoryCalloc
(
1
,
sizeof
(
tmq_t
));
if
(
pTmq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tscError
(
"consumer %"
PRId64
" setup failed since %s, consumer group %s"
,
pTmq
->
consumerId
,
terrstr
(),
pTmq
->
groupId
);
tscError
(
"setting up new consumer failed since %s, consumer group %s"
,
terrstr
(),
conf
->
groupId
);
return
NULL
;
}
...
...
@@ -939,10 +946,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
return
NULL
;
}
int64_t
*
pRefId
=
taosMemoryMalloc
(
sizeof
(
int64_t
));
*
pRefId
=
pTmq
->
refId
;
if
(
pTmq
->
hbBgEnable
)
{
int64_t
*
pRefId
=
taosMemoryMalloc
(
sizeof
(
int64_t
));
*
pRefId
=
pTmq
->
refId
;
pTmq
->
hbLiveTimer
=
taosTmrStart
(
tmqSendHbReq
,
1000
,
pRefId
,
tmqMgmt
.
timer
);
}
...
...
@@ -1061,9 +1067,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code
=
0
;
FAIL:
if
(
req
.
topicNames
!=
NULL
)
taosArrayDestroyP
(
req
.
topicNames
,
taosMemoryFree
);
if
(
code
!=
0
&&
buf
)
{
taosMemoryFree
(
buf
);
}
taosMemoryFree
(
buf
);
return
code
;
}
...
...
@@ -1101,7 +1106,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if
(
code
==
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
)
{
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
);
if
(
pRspWrapper
==
NULL
)
{
taosMemoryFree
(
pMsg
->
pData
);
tscWarn
(
"msg discard from vgId:%d, epoch %d since out of memory"
,
vgId
,
epoch
);
goto
CREATE_MSG_FAIL
;
}
...
...
source/common/src/tdataformat.c
浏览文件 @
d150bd1d
...
...
@@ -916,6 +916,10 @@ char *tTagValToData(const STagVal *value, bool isJson) {
}
bool
tTagGet
(
const
STag
*
pTag
,
STagVal
*
pTagVal
)
{
if
(
!
pTag
||
!
pTagVal
){
return
false
;
}
int16_t
lidx
=
0
;
int16_t
ridx
=
pTag
->
nTag
-
1
;
int16_t
midx
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
d150bd1d
...
...
@@ -115,7 +115,6 @@ typedef struct {
typedef
struct
{
SMqDataRsp
dataRsp
;
SMqRspHead
rspHead
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRpcHandleInfo
pInfo
;
}
STqPushEntry
;
...
...
@@ -183,6 +182,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
void
tqTableSink1
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
// tqOffset
char
*
tqOffsetBuildFName
(
const
char
*
path
,
int32_t
ver
);
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
d150bd1d
...
...
@@ -675,7 +675,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
SSchemaWrapper
*
pSchemaWrapper
=
&
schema
;
tDecoderInit
(
&
dc
,
pData
,
nData
);
tDecodeSSchemaWrapper
(
&
dc
,
pSchemaWrapper
);
(
void
)
tDecodeSSchemaWrapper
(
&
dc
,
pSchemaWrapper
);
tDecoderClear
(
&
dc
);
tdbFree
(
pData
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
d150bd1d
...
...
@@ -944,6 +944,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaUpdateTagIdx
(
pMeta
,
&
ctbEntry
);
}
ASSERT
(
ctbEntry
.
ctbEntry
.
pTags
);
SCtbIdxKey
ctbIdxKey
=
{.
suid
=
ctbEntry
.
ctbEntry
.
suid
,
.
uid
=
uid
};
tdbTbUpsert
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
ctbEntry
.
ctbEntry
.
pTags
,
((
STag
*
)(
ctbEntry
.
ctbEntry
.
pTags
))
->
len
,
&
pMeta
->
txn
);
...
...
@@ -952,7 +953,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear
(
&
dc1
);
tDecoderClear
(
&
dc2
);
if
(
ctbEntry
.
ctbEntry
.
pTags
)
taosMemoryFree
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
);
taosMemoryFree
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
);
if
(
ctbEntry
.
pBuf
)
taosMemoryFree
(
ctbEntry
.
pBuf
);
if
(
stbEntry
.
pBuf
)
tdbFree
(
stbEntry
.
pBuf
);
tdbTbcClose
(
pTbDbc
);
...
...
@@ -1202,8 +1203,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SMetaEntry
stbEntry
=
{
0
};
STagIdxKey
*
pTagIdxKey
=
NULL
;
int32_t
nTagIdxKey
;
const
SSchema
*
pTagColumn
;
// = &stbEntry.stbEntry.schema.pSchema[0];
const
void
*
pTagData
=
NULL
;
//
const
SSchema
*
pTagColumn
;
const
void
*
pTagData
=
NULL
;
int32_t
nTagData
=
0
;
SDecoder
dc
=
{
0
};
int32_t
ret
=
0
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d150bd1d
...
...
@@ -192,7 +192,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
return
-
1
;
}
memcpy
(
buf
,
&
pPushEntry
->
rspH
ead
,
sizeof
(
SMqRspHead
));
memcpy
(
buf
,
&
pPushEntry
->
dataRsp
.
h
ead
,
sizeof
(
SMqRspHead
));
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
...
...
@@ -215,7 +215,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
tFormatOffset
(
buf1
,
80
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
80
,
&
pRsp
->
rspOffset
);
tqDebug
(
"vgId:%d, from consumer:%"
PRId64
", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s"
,
TD_VID
(
pTq
->
pVnode
),
p
PushEntry
->
rspH
ead
.
consumerId
,
pRsp
->
head
.
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
);
TD_VID
(
pTq
->
pVnode
),
p
Rsp
->
h
ead
.
consumerId
,
pRsp
->
head
.
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
);
return
0
;
}
...
...
@@ -560,9 +560,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
memcpy
(
pPushEntry
->
subKey
,
pHandle
->
subKey
,
TSDB_SUBSCRIBE_KEY_LEN
);
dataRsp
.
withTbName
=
0
;
memcpy
(
&
pPushEntry
->
dataRsp
,
&
dataRsp
,
sizeof
(
SMqDataRsp
));
pPushEntry
->
rspH
ead
.
consumerId
=
consumerId
;
pPushEntry
->
rspH
ead
.
epoch
=
reqEpoch
;
pPushEntry
->
rspH
ead
.
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
pPushEntry
->
dataRsp
.
h
ead
.
consumerId
=
consumerId
;
pPushEntry
->
dataRsp
.
h
ead
.
epoch
=
reqEpoch
;
pPushEntry
->
dataRsp
.
h
ead
.
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
taosHashPut
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
)
+
1
,
&
pPushEntry
,
sizeof
(
void
*
));
tqDebug
(
"tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
));
...
...
@@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
1
;
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
d150bd1d
...
...
@@ -284,6 +284,212 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return
ret
;
}
void
tqTableSink1
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pBlocks
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
int64_t
suid
=
pTask
->
tbSink
.
stbUid
;
char
*
stbFullName
=
pTask
->
tbSink
.
stbFullName
;
STSchema
*
pTSchema
=
pTask
->
tbSink
.
pTSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
pTask
->
tbSink
.
pSchemaWrapper
;
int32_t
blockSz
=
taosArrayGetSize
(
pBlocks
);
bool
createTb
=
true
;
SArray
*
tagArray
=
taosArrayInit
(
1
,
sizeof
(
STagVal
));
if
(
!
tagArray
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
;
}
tqDebug
(
"vgId:%d, task %d write into table, block num: %d"
,
TD_VID
(
pVnode
),
pTask
->
taskId
,
blockSz
);
for
(
int32_t
i
=
0
;
i
<
blockSz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pBlocks
,
i
);
if
(
pDataBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
SBatchDeleteReq
deleteReq
=
{
0
};
deleteReq
.
deleteReqs
=
taosArrayInit
(
0
,
sizeof
(
SSingleDeleteReq
));
deleteReq
.
suid
=
suid
;
tqBuildDeleteReq
(
pVnode
,
stbFullName
,
pDataBlock
,
&
deleteReq
);
int32_t
len
;
int32_t
code
;
tEncodeSize
(
tEncodeSBatchDeleteReq
,
&
deleteReq
,
len
,
code
);
if
(
code
<
0
)
{
//
ASSERT
(
0
);
}
SEncoder
encoder
;
void
*
serializedDeleteReq
=
rpcMallocCont
(
len
+
sizeof
(
SMsgHead
));
void
*
abuf
=
POINTER_SHIFT
(
serializedDeleteReq
,
sizeof
(
SMsgHead
));
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSBatchDeleteReq
(
&
encoder
,
&
deleteReq
);
tEncoderClear
(
&
encoder
);
taosArrayDestroy
(
deleteReq
.
deleteReqs
);
((
SMsgHead
*
)
serializedDeleteReq
)
->
vgId
=
pVnode
->
config
.
vgId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_BATCH_DEL
,
.
pCont
=
serializedDeleteReq
,
.
contLen
=
len
+
sizeof
(
SMsgHead
),
};
if
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
!=
0
)
{
rpcFreeCont
(
serializedDeleteReq
);
tqDebug
(
"failed to put delete req into write-queue since %s"
,
terrstr
());
}
}
else
{
SVCreateTbReq
createTbReq
=
{
0
};
// set const
createTbReq
.
flags
=
0
;
createTbReq
.
type
=
TSDB_CHILD_TABLE
;
createTbReq
.
ctb
.
suid
=
suid
;
// set super table name
SName
name
=
{
0
};
tNameFromString
(
&
name
,
stbFullName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
createTbReq
.
ctb
.
stbName
=
strdup
((
char
*
)
tNameGetTableName
(
&
name
));
// strdup(stbFullName);
// set tag content
taosArrayClear
(
tagArray
);
STagVal
tagVal
=
{
.
cid
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
)
+
1
,
.
type
=
TSDB_DATA_TYPE_UBIGINT
,
.
i64
=
(
int64_t
)
pDataBlock
->
info
.
groupId
,
};
taosArrayPush
(
tagArray
,
&
tagVal
);
createTbReq
.
ctb
.
tagNum
=
taosArrayGetSize
(
tagArray
);
STag
*
pTag
=
NULL
;
tTagNew
(
tagArray
,
1
,
false
,
&
pTag
);
if
(
pTag
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosArrayDestroy
(
tagArray
);
tdDestroySVCreateTbReq
(
&
createTbReq
);
return
;
}
createTbReq
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
// set tag name
SArray
*
tagName
=
taosArrayInit
(
1
,
TSDB_COL_NAME_LEN
);
char
tagNameStr
[
TSDB_COL_NAME_LEN
]
=
{
0
};
strcpy
(
tagNameStr
,
"group_id"
);
taosArrayPush
(
tagName
,
tagNameStr
);
createTbReq
.
ctb
.
tagName
=
tagName
;
// set table name
if
(
pDataBlock
->
info
.
parTbName
[
0
])
{
createTbReq
.
name
=
strdup
(
pDataBlock
->
info
.
parTbName
);
}
else
{
createTbReq
.
name
=
buildCtbNameByGroupId
(
stbFullName
,
pDataBlock
->
info
.
groupId
);
}
int32_t
schemaLen
;
int32_t
code
;
tEncodeSize
(
tEncodeSVCreateTbReq
,
&
createTbReq
,
schemaLen
,
code
);
if
(
code
<
0
)
{
tdDestroySVCreateTbReq
(
&
createTbReq
);
taosArrayDestroy
(
tagArray
);
return
;
}
// save schema str
void
*
schemaStr
=
taosMemoryMalloc
(
schemaLen
);
if
(
schemaStr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tdDestroySVCreateTbReq
(
&
createTbReq
);
taosArrayDestroy
(
tagArray
);
return
;
}
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
schemaStr
,
schemaLen
);
code
=
tEncodeSVCreateTbReq
(
&
encoder
,
&
createTbReq
);
if
(
code
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tdDestroySVCreateTbReq
(
&
createTbReq
);
taosArrayDestroy
(
tagArray
);
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
schemaStr
);
return
;
}
tEncoderClear
(
&
encoder
);
tdDestroySVCreateTbReq
(
&
createTbReq
);
int32_t
cap
=
sizeof
(
SSubmitReq
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSchema
);
cap
+=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
maxLen
;
SSubmitReq
*
ret
=
rpcMallocCont
(
cap
);
ret
->
header
.
vgId
=
pVnode
->
config
.
vgId
;
ret
->
length
=
sizeof
(
SSubmitReq
);
ret
->
numOfBlocks
=
htonl
(
1
);
SSubmitBlk
*
blkHead
=
POINTER_SHIFT
(
ret
,
sizeof
(
SSubmitReq
));
blkHead
->
numOfRows
=
htonl
(
pDataBlock
->
info
.
rows
);
blkHead
->
sversion
=
htonl
(
pTSchema
->
version
);
blkHead
->
suid
=
htobe64
(
suid
);
// uid is assigned by vnode
blkHead
->
uid
=
0
;
blkHead
->
schemaLen
=
0
;
tqDebug
(
"tq sink, convert block %d, rows: %d"
,
i
,
rows
);
int32_t
dataLen
=
0
;
void
*
blkSchema
=
POINTER_SHIFT
(
blkHead
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
blkSchema
;
if
(
createTb
)
{
memcpy
(
blkSchema
,
schemaStr
,
schemaLen
);
blkHead
->
schemaLen
=
htonl
(
schemaLen
);
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
}
taosMemoryFree
(
schemaStr
);
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pTSchema
->
version
);
tdSRowSetTpInfo
(
&
rb
,
pTSchema
->
numOfCols
,
pTSchema
->
flen
);
tdSRowResetBuf
(
&
rb
,
rowData
);
for
(
int32_t
k
=
0
;
k
<
pTSchema
->
numOfCols
;
k
++
)
{
const
STColumn
*
pColumn
=
&
pTSchema
->
columns
[
k
];
SColumnInfoData
*
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
if
(
colDataIsNull_s
(
pColData
,
j
))
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
pColumn
->
offset
,
k
);
}
else
{
void
*
colData
=
colDataGetData
(
pColData
,
j
);
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
colData
,
true
,
pColumn
->
offset
,
k
);
}
}
tdSRowEnd
(
&
rb
);
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
dataLen
+=
rowLen
;
}
blkHead
->
dataLen
=
htonl
(
dataLen
);
ret
->
length
+=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
ret
->
length
=
htonl
(
ret
->
length
);
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
ret
,
.
contLen
=
ntohl
(
ret
->
length
),
};
if
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
!=
0
)
{
rpcFreeCont
(
ret
);
tqDebug
(
"failed to put into write-queue since %s"
,
terrstr
());
}
}
}
taosArrayDestroy
(
tagArray
);
}
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pRes
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d150bd1d
...
...
@@ -2789,6 +2789,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
groupId
)
{
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
d150bd1d
...
...
@@ -1006,6 +1006,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft
-=
kLen
;
// pack partial val to local if any space left
if
(
nLocal
>
nHeader
+
kLen
+
sizeof
(
SPgno
))
{
ASSERT
(
pVal
!=
NULL
&&
vLen
!=
0
);
memcpy
(
pCell
+
nHeader
+
kLen
,
pVal
,
nLocal
-
nHeader
-
kLen
-
sizeof
(
SPgno
));
nLeft
-=
nLocal
-
nHeader
-
kLen
-
sizeof
(
SPgno
);
}
...
...
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
d150bd1d
...
...
@@ -105,6 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
for
(
int32_t
iPage
=
pCache
->
nPages
;
iPage
<
nPage
;
iPage
++
)
{
if
(
tdbPageCreate
(
pCache
->
szPage
,
&
aPage
[
iPage
],
tdbDefaultMalloc
,
NULL
)
<
0
)
{
// TODO: handle error
tdbOsFree
(
pCache
->
aPage
);
return
-
1
;
}
...
...
@@ -267,7 +268,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// 4. Try a create new page
if
(
!
pPage
)
{
ret
=
tdbPageCreate
(
pCache
->
szPage
,
&
pPage
,
pTxn
->
xMalloc
,
pTxn
->
xArg
);
if
(
ret
<
0
)
{
if
(
ret
<
0
&&
pPage
!=
NULL
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
...
...
@@ -300,8 +301,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
pPage
->
pPager
=
pPageH
->
pPager
;
memcpy
(
pPage
->
pData
,
pPageH
->
pData
,
pPage
->
pageSize
);
tdbDebug
(
"pcache/pPageH: %p %d %p %p %d"
,
pPageH
,
pPageH
->
pPageHdr
-
pPageH
->
pData
,
pPageH
->
xCellSize
,
pPage
,
TDB_PAGE_PGNO
(
pPageH
));
//
tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
//
TDB_PAGE_PGNO(pPageH));
tdbPageInit
(
pPage
,
pPageH
->
pPageHdr
-
pPageH
->
pData
,
pPageH
->
xCellSize
);
pPage
->
kLen
=
pPageH
->
kLen
;
pPage
->
vLen
=
pPageH
->
vLen
;
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
d150bd1d
...
...
@@ -84,7 +84,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
pPager
->
pCache
=
pCache
;
pPager
->
fd
=
tdbOsOpen
(
pPager
->
dbFileName
,
TDB_O_CREAT
|
TDB_O_RDWR
,
0755
);
if
(
pPager
->
fd
<
0
)
{
if
(
TDB_FD_INVALID
(
pPager
->
fd
))
{
// if (pPager->fd < 0) {
return
-
1
;
}
...
...
@@ -226,7 +227,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
// Open the journal
pPager
->
jfd
=
tdbOsOpen
(
pPager
->
jFileName
,
TDB_O_CREAT
|
TDB_O_RDWR
,
0755
);
if
(
pPager
->
jfd
<
0
)
{
if
(
TDB_FD_INVALID
(
pPager
->
jfd
)
)
{
tdbError
(
"failed to open file due to %s. jFileName:%s"
,
strerror
(
errno
),
pPager
->
jFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -365,7 +366,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 4, remove the journal file
tdbOsClose
(
pPager
->
jfd
);
tdbOsRemove
(
pPager
->
jFileName
);
(
void
)
tdbOsRemove
(
pPager
->
jFileName
);
pPager
->
inTran
=
0
;
return
0
;
...
...
@@ -540,7 +541,8 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
ret
=
tdbOsWrite
(
pPager
->
jfd
,
pPage
->
pData
,
pPage
->
pageSize
);
if
(
ret
<
0
)
{
tdbError
(
"failed to write page data due to %s. file:%s, pageSize:%ld"
,
strerror
(
errno
),
pPager
->
jFileName
,
pPage
->
pageSize
);
tdbError
(
"failed to write page data due to %s. file:%s, pageSize:%ld"
,
strerror
(
errno
),
pPager
->
jFileName
,
pPage
->
pageSize
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
...
...
@@ -568,7 +570,8 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
ret
=
tdbOsWrite
(
pPager
->
fd
,
pPage
->
pData
,
pPage
->
pageSize
);
if
(
ret
<
0
)
{
tdbError
(
"failed to write page data due to %s. file:%s, pageSize:%ld"
,
strerror
(
errno
),
pPager
->
dbFileName
,
pPage
->
pageSize
);
tdbError
(
"failed to write page data due to %s. file:%s, pageSize:%ld"
,
strerror
(
errno
),
pPager
->
dbFileName
,
pPage
->
pageSize
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
...
...
@@ -603,11 +606,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
int
ret
=
tdbOsRead
(
jfd
,
&
pgno
,
sizeof
(
pgno
));
if
(
ret
<
0
)
{
tdbOsFree
(
pageBuf
);
return
-
1
;
}
ret
=
tdbOsRead
(
jfd
,
pageBuf
,
pPager
->
pageSize
);
if
(
ret
<
0
)
{
tdbOsFree
(
pageBuf
);
return
-
1
;
}
...
...
@@ -615,13 +620,16 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
if
(
tdbOsLSeek
(
pPager
->
fd
,
offset
,
SEEK_SET
)
<
0
)
{
tdbError
(
"failed to lseek fd due to %s. file:%s, offset:%ld"
,
strerror
(
errno
),
pPager
->
dbFileName
,
offset
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tdbOsFree
(
pageBuf
);
return
-
1
;
}
ret
=
tdbOsWrite
(
pPager
->
fd
,
pageBuf
,
pPager
->
pageSize
);
if
(
ret
<
0
)
{
tdbError
(
"failed to write buf due to %s. file: %s, bufsize:%d"
,
strerror
(
errno
),
pPager
->
dbFileName
,
pPager
->
pageSize
);
tdbError
(
"failed to write buf due to %s. file: %s, bufsize:%d"
,
strerror
(
errno
),
pPager
->
dbFileName
,
pPager
->
pageSize
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tdbOsFree
(
pageBuf
);
return
-
1
;
}
}
...
...
@@ -629,6 +637,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
if
(
tdbOsFSync
(
pPager
->
fd
)
<
0
)
{
tdbError
(
"failed to fsync fd due to %s. dbfile:%s"
,
strerror
(
errno
),
pPager
->
dbFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tdbOsFree
(
pageBuf
);
return
-
1
;
}
...
...
source/libs/tdb/src/db/tdbTable.c
浏览文件 @
d150bd1d
...
...
@@ -106,11 +106,13 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
// pTb->pBt
ret
=
tdbBtreeOpen
(
keyLen
,
valLen
,
pPager
,
tbname
,
pgno
,
keyCmprFn
,
&
(
pTb
->
pBt
));
if
(
ret
<
0
)
{
tdbOsFree
(
pTb
);
return
-
1
;
}
ret
=
tdbPagerRestore
(
pPager
,
pTb
->
pBt
);
if
(
ret
<
0
)
{
tdbOsFree
(
pTb
);
return
-
1
;
}
...
...
source/libs/tdb/src/inc/tdbOs.h
浏览文件 @
d150bd1d
...
...
@@ -37,6 +37,8 @@ extern "C" {
/* file */
typedef
TdFilePtr
tdb_fd_t
;
#define TDB_FD_INVALID(fd) (fd == NULL)
#define TDB_O_CREAT TD_FILE_CREATE
#define TDB_O_WRITE TD_FILE_WRITE
#define TDB_O_READ TD_FILE_READ
...
...
@@ -141,4 +143,4 @@ typedef pthread_mutex_t tdb_mutex_t;
}
#endif
#endif
/*_TDB_OS_H_*/
\ No newline at end of file
#endif
/*_TDB_OS_H_*/
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
d150bd1d
...
...
@@ -52,7 +52,7 @@ class TDTestCase:
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
,
wal_retention_size
=-
1
,
wal_retention_period
=-
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
# tdLog.info("create ctb")
...
...
tools/shell/src/shellArguments.c
浏览文件 @
d150bd1d
...
...
@@ -22,7 +22,7 @@
#define TAOS_CONSOLE_PROMPT_HEADER "taos> "
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
#define SHELL_HOST "T
he auth string to use when connecting to the server
."
#define SHELL_HOST "T
Dengine server FQDN to connect. The default host is localhost
."
#define SHELL_PORT "The TCP/IP port number to use for the connection."
#define SHELL_USER "The user name to use when connecting to the server."
#define SHELL_PASSWORD "The password to use when connecting to the server."
...
...
taosws-rs
@
7a94ffab
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
utils/test/c/sml_test.c
浏览文件 @
d150bd1d
...
...
@@ -119,7 +119,7 @@ int smlProcess_json1_Test() {
"
\"
dc
\"
:
\"
lga
\"
"
" }"
" }"
"]"
};
"]"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -159,7 +159,7 @@ int smlProcess_json2_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -227,7 +227,7 @@ int smlProcess_json3_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -286,7 +286,7 @@ int smlProcess_json4_Test() {
"
\"
t9
\"
: false,"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
utils/test/c/tmqSim.c
浏览文件 @
d150bd1d
...
...
@@ -155,7 +155,7 @@ static void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s%s
%d
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s
\n
"
,
indent
,
"-p"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"producer thread number, default is 0"
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
...
...
@@ -238,7 +238,7 @@ void saveConfigToLogFile() {
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" expect rows: %
d
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
taosFprintfFile
(
g_fp
,
" expect rows: %
"
PRIx64
"
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
}
char
tmpString
[
128
];
...
...
@@ -263,11 +263,11 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp
();
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
dbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
cdbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]
);
tstrncpy
(
configDir
,
argv
[
++
i
],
PATH_MAX
);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
...
...
@@ -279,9 +279,9 @@ void parseArgument(int32_t argc, char* argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
g_stConfInfo
.
useSnapshot
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
char
tmpBuf
[
56
]
;
strcpy
(
tmpBuf
,
argv
[
++
i
]
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
char
tmpBuf
[
56
]
=
{
0
}
;
tstrncpy
(
tmpBuf
,
argv
[
++
i
],
sizeof
(
tmpBuf
)
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
}
else
if
(
strcmp
(
argv
[
i
],
"-x"
)
==
0
)
{
g_stConfInfo
.
numOfThread
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
...
...
@@ -294,6 +294,10 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
producerRate
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
g_stConfInfo
.
payloadLen
=
atol
(
argv
[
++
i
]);
if
(
g_stConfInfo
.
payloadLen
<=
0
||
g_stConfInfo
.
payloadLen
>
1024
*
1024
*
1024
){
pError
(
"%s calloc size is too large: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
...
...
@@ -354,8 +358,8 @@ void ltrim(char* str) {
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
int
retryCnt
=
10
;
int
code
;
TAOS_RES
*
pRes
;
int
code
=
0
;
TAOS_RES
*
pRes
=
NULL
;
while
(
retryCnt
--
)
{
pRes
=
taos_query
(
taos
,
command
);
...
...
@@ -363,10 +367,11 @@ int queryDB(TAOS* taos, char* command) {
if
(
code
!=
0
)
{
taosSsleep
(
1
);
taos_free_result
(
pRes
);
pRes
=
NULL
;
continue
;
}
taos_free_result
(
pRes
);
return
0
;
return
0
;
}
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
...
...
@@ -418,7 +423,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char
sqlStr
[
1100
]
=
{
0
};
if
(
strlen
(
buf
)
>
1024
)
{
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
strlen
(
buf
));
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
(
int
)
strlen
(
buf
));
taosCloseFile
(
&
g_fp
);
return
-
1
;
}
...
...
@@ -592,7 +597,7 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -644,7 +649,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -960,7 +965,7 @@ void parseConsumeInfo() {
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
stThreads
[
i
].
key
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
);
tstrncpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
,
sizeof
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
])
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
...
...
@@ -1268,25 +1273,26 @@ void* ombProduceThreadFunc(void* param) {
for
(
int
i
=
0
;
i
<
batchPerTblTimes
;
++
i
)
{
uint32_t
msgsOfSql
=
g_stConfInfo
.
batchSize
;
if
((
i
==
batchPerTblTimes
-
1
)
&&
(
0
!=
remainder
))
{
msgsOfSql
=
remainder
;
msgsOfSql
=
remainder
;
}
int
len
=
0
;
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"insert into %s values "
,
ctbName
);
for
(
int
j
=
0
;
j
<
msgsOfSql
;
j
++
)
{
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
sendMsgs
++
;
pInfo
->
totalProduceMsgs
++
;
}
totalMsgLen
+=
len
;
totalMsgLen
+=
len
;
pInfo
->
totalMsgsLen
+=
len
;
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
if
(
affectedRows
<
0
)
{
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
return
NULL
;
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
affectedRowsTotal
+=
affectedRows
;
...
...
@@ -1322,6 +1328,7 @@ void* ombProduceThreadFunc(void* param) {
printf
(
"affectedRowsTotal: %"
PRId64
"
\n
"
,
affectedRowsTotal
);
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
...
...
utils/test/c/tmq_taosx_ci.c
浏览文件 @
d150bd1d
...
...
@@ -663,7 +663,7 @@ void initLogFile() {
int
main
(
int
argc
,
char
*
argv
[])
{
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
){
strcpy
(
g_conf
.
dir
,
argv
[
++
i
]
);
tstrncpy
(
g_conf
.
dir
,
argv
[
++
i
],
sizeof
(
g_conf
.
dir
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
){
g_conf
.
snapShot
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
){
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录