Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9d2305f2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9d2305f2
编写于
1月 29, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/query
上级
4f959dbf
9242cef8
变更
44
隐藏空白更改
内联
并排
Showing
44 changed file
with
523 addition
and
245 deletion
+523
-245
packaging/cfg/taos.cfg
packaging/cfg/taos.cfg
+6
-0
packaging/deb/makedeb.sh
packaging/deb/makedeb.sh
+3
-0
packaging/rpm/tdengine.spec
packaging/rpm/tdengine.spec
+3
-0
packaging/tools/install.sh
packaging/tools/install.sh
+1
-3
packaging/tools/install_power.sh
packaging/tools/install_power.sh
+1
-0
packaging/tools/make_install.sh
packaging/tools/make_install.sh
+4
-1
packaging/tools/makeclient.sh
packaging/tools/makeclient.sh
+2
-1
packaging/tools/makeclient_power.sh
packaging/tools/makeclient_power.sh
+1
-0
packaging/tools/makepkg.sh
packaging/tools/makepkg.sh
+2
-1
packaging/tools/makepkg_power.sh
packaging/tools/makepkg_power.sh
+4
-1
packaging/tools/post.sh
packaging/tools/post.sh
+1
-0
packaging/tools/startPre.sh
packaging/tools/startPre.sh
+50
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+11
-4
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+21
-15
src/client/src/tscSql.c
src/client/src/tscSql.c
+18
-21
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+71
-28
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+15
-10
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+2
-1
src/common/src/tglobal.c
src/common/src/tglobal.c
+6
-1
src/common/src/ttypes.c
src/common/src/ttypes.c
+1
-1
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+155
-45
src/dnode/src/dnodeCfg.c
src/dnode/src/dnodeCfg.c
+1
-1
src/dnode/src/dnodeEps.c
src/dnode/src/dnodeEps.c
+1
-1
src/dnode/src/dnodeMInfos.c
src/dnode/src/dnodeMInfos.c
+1
-1
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+6
-6
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+82
-38
src/vnode/src/vnodeCfg.c
src/vnode/src/vnodeCfg.c
+1
-1
src/vnode/src/vnodeVersion.c
src/vnode/src/vnodeVersion.c
+1
-1
tests/examples/JDBC/SpringJdbcTemplate/pom.xml
tests/examples/JDBC/SpringJdbcTemplate/pom.xml
+2
-2
tests/examples/JDBC/SpringJdbcTemplate/readme.md
tests/examples/JDBC/SpringJdbcTemplate/readme.md
+10
-12
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/App.java
.../src/main/java/com/taosdata/example/jdbcTemplate/App.java
+4
-4
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/ExecuteAsStatement.java
...taosdata/example/jdbcTemplate/dao/ExecuteAsStatement.java
+1
-1
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/ExecuteAsStatementImpl.java
...data/example/jdbcTemplate/dao/ExecuteAsStatementImpl.java
+1
-2
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/WeatherDao.java
...ava/com/taosdata/example/jdbcTemplate/dao/WeatherDao.java
+2
-2
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/WeatherDaoImpl.java
...com/taosdata/example/jdbcTemplate/dao/WeatherDaoImpl.java
+3
-7
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/domain/Weather.java
...ava/com/taosdata/example/jdbcTemplate/domain/Weather.java
+1
-1
tests/examples/JDBC/SpringJdbcTemplate/src/main/resources/applicationContext.xml
...ingJdbcTemplate/src/main/resources/applicationContext.xml
+2
-2
tests/examples/JDBC/SpringJdbcTemplate/src/test/java/com/taosdata/example/jdbcTemplate/BatcherInsertTest.java
.../com/taosdata/example/jdbcTemplate/BatcherInsertTest.java
+4
-4
tests/examples/JDBC/SpringJdbcTemplate/src/test/java/com/taosdata/jdbc/AppTest.java
...JdbcTemplate/src/test/java/com/taosdata/jdbc/AppTest.java
+0
-18
tests/examples/JDBC/readme.md
tests/examples/JDBC/readme.md
+13
-0
tests/script/general/parser/dbtbnameValidate.sim
tests/script/general/parser/dbtbnameValidate.sim
+6
-6
tests/test-all.sh
tests/test-all.sh
+1
-1
未找到文件。
packaging/cfg/taos.cfg
浏览文件 @
9d2305f2
...
...
@@ -270,3 +270,9 @@
# in retrieve blocking model, only in 50% query threads will be used in query processing in dnode
# retrieveBlockingModel 0
# the maximum allowed query buffer size in MB during query processing for each data node
# -1 no limit (default)
# 0 no query allowed, queries are disabled
# queryBufferSize -1
packaging/deb/makedeb.sh
浏览文件 @
9d2305f2
...
...
@@ -47,6 +47,9 @@ cp ${compile_dir}/../packaging/cfg/taos.cfg ${pkg_dir}${install_home_pat
cp
${
compile_dir
}
/../packaging/deb/taosd
${
pkg_dir
}${
install_home_path
}
/init.d
cp
${
compile_dir
}
/../packaging/tools/post.sh
${
pkg_dir
}${
install_home_path
}
/script
cp
${
compile_dir
}
/../packaging/tools/preun.sh
${
pkg_dir
}${
install_home_path
}
/script
cp
${
compile_dir
}
/../packaging/tools/startPre.sh
${
pkg_dir
}${
install_home_path
}
/bin
cp
${
compile_dir
}
/../packaging/tools/set_core.sh
${
pkg_dir
}${
install_home_path
}
/bin
cp
${
compile_dir
}
/../packaging/tools/taosd-dump-cfg.gdb
${
pkg_dir
}${
install_home_path
}
/bin
cp
${
compile_dir
}
/build/bin/taosdemo
${
pkg_dir
}${
install_home_path
}
/bin
cp
${
compile_dir
}
/build/bin/taosdemox
${
pkg_dir
}${
install_home_path
}
/bin
cp
${
compile_dir
}
/build/bin/taosdump
${
pkg_dir
}${
install_home_path
}
/bin
...
...
packaging/rpm/tdengine.spec
浏览文件 @
9d2305f2
...
...
@@ -55,6 +55,9 @@ cp %{_compiledir}/../packaging/cfg/taos.cfg %{buildroot}%{homepath}/cfg
cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d
cp %{_compiledir}/../packaging/tools/post.sh %{buildroot}%{homepath}/script
cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/script
cp %{_compiledir}/../packaging/tools/startPre.sh %{buildroot}%{homepath}/bin
cp %{_compiledir}/../packaging/tools/set_core.sh %{buildroot}%{homepath}/bin
cp %{_compiledir}/../packaging/tools/taosd-dump-cfg.gdb %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
...
...
packaging/tools/install.sh
浏览文件 @
9d2305f2
...
...
@@ -604,9 +604,7 @@ function install_service_on_systemd() {
${
csudo
}
bash
-c
"echo '[Service]' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'Type=simple' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStart=/usr/bin/taosd' >>
${
taosd_service_config
}
"
#${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/setDelay.sh' >> ${taosd_service_config}"
#${csudo} bash -c "echo 'ExecStartPost=/usr/local/taos/bin/resetDelay.sh' >> ${taosd_service_config}"
#${csudo} bash -c "echo 'ExecStopPost=/usr/local/taos/bin/resetDelay.sh' >> ${taosd_service_config}"
${
csudo
}
bash
-c
"echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNOFILE=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNPROC=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitCORE=infinity' >>
${
taosd_service_config
}
"
...
...
packaging/tools/install_power.sh
浏览文件 @
9d2305f2
...
...
@@ -578,6 +578,7 @@ function install_service_on_systemd() {
${
csudo
}
bash
-c
"echo '[Service]' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'Type=simple' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStart=/usr/bin/powerd' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStartPre=/usr/local/power/bin/startPre.sh' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNOFILE=infinity' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNPROC=infinity' >>
${
powerd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitCORE=infinity' >>
${
powerd_service_config
}
"
...
...
packaging/tools/make_install.sh
浏览文件 @
9d2305f2
...
...
@@ -149,10 +149,12 @@ function install_bin() {
${
csudo
}
rm
-f
${
bin_link_dir
}
/rmtaos
||
:
${
csudo
}
cp
-r
${
binary_dir
}
/build/bin/
*
${
install_main_dir
}
/bin
${
csudo
}
cp
-r
${
script_dir
}
/taosd-dump-cfg.gdb
${
install_main_dir
}
/bin
if
[
"
$osType
"
!=
"Darwin"
]
;
then
${
csudo
}
cp
-r
${
script_dir
}
/remove.sh
${
install_main_dir
}
/bin
${
csudo
}
cp
-r
${
script_dir
}
/remove.sh
${
install_main_dir
}
/bin
${
csudo
}
cp
-r
${
script_dir
}
/set_core.sh
${
install_main_dir
}
/bin
${
csudo
}
cp
-r
${
script_dir
}
/startPre.sh
${
install_main_dir
}
/bin
else
${
csudo
}
cp
-r
${
script_dir
}
/remove_client.sh
${
install_main_dir
}
/bin
fi
...
...
@@ -330,6 +332,7 @@ function install_service_on_systemd() {
${
csudo
}
bash
-c
"echo '[Service]' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'Type=simple' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStart=/usr/bin/taosd' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNOFILE=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNPROC=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitCORE=infinity' >>
${
taosd_service_config
}
"
...
...
packaging/tools/makeclient.sh
浏览文件 @
9d2305f2
...
...
@@ -45,7 +45,8 @@ if [ "$osType" != "Darwin" ]; then
strip
${
build_dir
}
/bin/taos
bin_files
=
"
${
build_dir
}
/bin/taos
${
script_dir
}
/remove_client.sh"
else
bin_files
=
"
${
build_dir
}
/bin/taos
${
build_dir
}
/bin/taosdump
${
build_dir
}
/bin/taosdemo
${
build_dir
}
/bin/taosdemox
${
script_dir
}
/remove_client.sh
${
script_dir
}
/set_core.sh
${
script_dir
}
/get_client.sh"
bin_files
=
"
${
build_dir
}
/bin/taos
${
build_dir
}
/bin/taosdump
${
build_dir
}
/bin/taosdemo
${
build_dir
}
/bin/taosdemox
\
${
script_dir
}
/remove_client.sh
${
script_dir
}
/set_core.sh
${
script_dir
}
/get_client.sh
${
script_dir
}
/taosd-dump-cfg.gdb"
fi
lib_files
=
"
${
build_dir
}
/lib/libtaos.so.
${
version
}
"
else
...
...
packaging/tools/makeclient_power.sh
浏览文件 @
9d2305f2
...
...
@@ -81,6 +81,7 @@ if [ "$osType" != "Darwin" ]; then
cp
${
build_dir
}
/bin/taosdump
${
install_dir
}
/bin/powerdump
cp
${
script_dir
}
/set_core.sh
${
install_dir
}
/bin
cp
${
script_dir
}
/get_client.sh
${
install_dir
}
/bin
cp
${
script_dir
}
/taosd-dump-cfg.gdb
${
install_dir
}
/bin
fi
else
cp
${
bin_files
}
${
install_dir
}
/bin
...
...
packaging/tools/makepkg.sh
浏览文件 @
9d2305f2
...
...
@@ -36,7 +36,8 @@ if [ "$pagMode" == "lite" ]; then
strip
${
build_dir
}
/bin/taos
bin_files
=
"
${
build_dir
}
/bin/taosd
${
build_dir
}
/bin/taos
${
script_dir
}
/remove.sh"
else
bin_files
=
"
${
build_dir
}
/bin/taosd
${
build_dir
}
/bin/taos
${
build_dir
}
/bin/taosdump
${
build_dir
}
/bin/taosdemo
${
build_dir
}
/bin/taosdemox
${
build_dir
}
/bin/tarbitrator
${
script_dir
}
/remove.sh
${
script_dir
}
/set_core.sh
${
script_dir
}
/get_client.sh"
bin_files
=
"
${
build_dir
}
/bin/taosd
${
build_dir
}
/bin/taos
${
build_dir
}
/bin/taosdump
${
build_dir
}
/bin/taosdemo
${
build_dir
}
/bin/taosdemox
${
build_dir
}
/bin/tarbitrator
\
${
script_dir
}
/remove.sh
${
script_dir
}
/set_core.sh
${
script_dir
}
/startPre.sh
${
script_dir
}
/taosd-dump-cfg.gdb"
fi
lib_files
=
"
${
build_dir
}
/lib/libtaos.so.
${
version
}
"
...
...
packaging/tools/makepkg_power.sh
浏览文件 @
9d2305f2
...
...
@@ -36,7 +36,8 @@ fi
# strip ${build_dir}/bin/taos
# bin_files="${build_dir}/bin/powerd ${build_dir}/bin/power ${script_dir}/remove_power.sh"
#else
# bin_files="${build_dir}/bin/powerd ${build_dir}/bin/power ${build_dir}/bin/powerdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove_power.sh ${script_dir}/set_core.sh"
# bin_files="${build_dir}/bin/powerd ${build_dir}/bin/power ${build_dir}/bin/powerdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove_power.sh\
# ${script_dir}/set_core.sh ${script_dir}/startPre.sh ${script_dir}/taosd-dump-cfg.gdb"
#fi
lib_files
=
"
${
build_dir
}
/lib/libtaos.so.
${
version
}
"
...
...
@@ -82,6 +83,8 @@ else
cp
${
build_dir
}
/bin/tarbitrator
${
install_dir
}
/bin
cp
${
script_dir
}
/set_core.sh
${
install_dir
}
/bin
cp
${
script_dir
}
/get_client.sh
${
install_dir
}
/bin
cp
${
script_dir
}
/startPre.sh
${
install_dir
}
/bin
cp
${
script_dir
}
/taosd-dump-cfg.gdb
${
install_dir
}
/bin
fi
chmod
a+x
${
install_dir
}
/bin/
*
||
:
...
...
packaging/tools/post.sh
浏览文件 @
9d2305f2
...
...
@@ -406,6 +406,7 @@ function install_service_on_systemd() {
${
csudo
}
bash
-c
"echo '[Service]' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'Type=simple' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStart=/usr/bin/taosd' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNOFILE=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitNPROC=infinity' >>
${
taosd_service_config
}
"
${
csudo
}
bash
-c
"echo 'LimitCORE=infinity' >>
${
taosd_service_config
}
"
...
...
packaging/tools/startPre.sh
0 → 100644
浏览文件 @
9d2305f2
#!/bin/bash
#
# if enable core dump, set start count to 3, disable core dump, set start count to 20.
# set -e
# set -x
taosd
=
/etc/systemd/system/taosd.service
line
=
`
grep
StartLimitBurst
${
taosd
}
`
num
=
${
line
##*=
}
#echo "burst num: ${num}"
startSeqFile
=
/usr/local/taos/.startSeq
recordFile
=
/usr/local/taos/.startRecord
startSeq
=
0
if
[[
!
-e
${
startSeqFile
}
]]
;
then
startSeq
=
0
else
startSeq
=
$(
cat
${
startSeqFile
}
)
fi
nextSeq
=
`
expr
$startSeq
+ 1
`
echo
"
${
nextSeq
}
"
>
${
startSeqFile
}
curTime
=
$(
date
"+%Y-%m-%d %H:%M:%S"
)
echo
"startSeq:
${
startSeq
}
startPre.sh exec
${
curTime
}
, burstCnt:
${
num
}
"
>>
${
recordFile
}
coreFlag
=
`
ulimit
-c
`
echo
"coreFlag:
${
coreFlag
}
"
>>
${
recordFile
}
if
[
${
coreFlag
}
=
"0"
]
;
then
#echo "core is 0"
if
[
${
num
}
!=
"20"
]
;
then
sed
-i
"s/^.*StartLimitBurst.*
$/
StartLimitBurst=20/"
${
taosd
}
systemctl daemon-reload
echo
"modify burst count from
${
num
}
to 20"
>>
${
recordFile
}
fi
fi
if
[
${
coreFlag
}
=
"unlimited"
]
;
then
#echo "core is unlimited"
if
[
${
num
}
!=
"3"
]
;
then
sed
-i
"s/^.*StartLimitBurst.*
$/
StartLimitBurst=3/"
${
taosd
}
systemctl daemon-reload
echo
"modify burst count from
${
num
}
to 3"
>>
${
recordFile
}
fi
fi
src/client/inc/tsclient.h
浏览文件 @
9d2305f2
...
...
@@ -299,6 +299,11 @@ typedef struct {
struct
SLocalMerger
*
pLocalMerger
;
}
SSqlRes
;
typedef
struct
{
char
key
[
512
];
void
*
pDnodeConn
;
}
SRpcObj
;
typedef
struct
STscObj
{
void
*
signature
;
void
*
pTimer
;
...
...
@@ -314,8 +319,8 @@ typedef struct STscObj {
int64_t
hbrid
;
struct
SSqlObj
*
sqlList
;
struct
SSqlStream
*
streamList
;
SRpc
CorEpSet
*
tscCorMgmtEpSet
;
void
*
pDnodeConn
;
SRpc
Obj
*
pRpcObj
;
SRpcCorEpSet
*
tscCorMgmtEpSet
;
pthread_mutex_t
mutex
;
int32_t
numOfObj
;
// number of sqlObj from this tscObj
}
STscObj
;
...
...
@@ -392,8 +397,10 @@ typedef struct SSqlStream {
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
);
int32_t
tscInitRpc
(
const
char
*
user
,
const
char
*
secret
,
void
**
pDnodeConn
);
void
tscInitMsgsFp
();
int
tscAcquireRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secret
,
void
**
pRpcObj
);
void
tscReleaseRpc
(
void
*
param
);
void
tscInitMsgsFp
();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
...
...
src/client/src/tscSQLParser.c
浏览文件 @
9d2305f2
...
...
@@ -5397,6 +5397,7 @@ int32_t validateColumnName(char* name) {
if
(
token
.
type
==
TK_STRING
)
{
strdequote
(
token
.
z
);
strntolower
(
token
.
z
,
token
.
z
,
token
.
n
);
token
.
n
=
(
uint32_t
)
strtrim
(
token
.
z
);
int32_t
k
=
tSQLGetToken
(
token
.
z
,
&
token
.
type
);
...
...
src/client/src/tscServer.c
浏览文件 @
9d2305f2
...
...
@@ -157,13 +157,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
if
(
epSet
->
numOfEps
>
0
)
{
tscEpSetHtons
(
epSet
);
if
(
!
tscEpSetIsEqual
(
&
pSql
->
pTscObj
->
tscCorMgmtEpSet
->
epSet
,
epSet
))
{
tscTrace
(
"%p updating epset: numOfEps: %d, inUse: %d"
,
pSql
,
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int8_t
i
=
0
;
i
<
epSet
->
numOfEps
;
i
++
)
{
tscTrace
(
"endpoint %d: fqdn=%s, port=%d"
,
i
,
epSet
->
fqdn
[
i
],
epSet
->
port
[
i
]);
}
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
//SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
//if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) {
// tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse);
// for (int8_t i = 0; i < epSet->numOfEps; i++) {
// tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]);
// }
//}
//concurrency problem, update mgmt epset anyway
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
pSql
->
pTscObj
->
connId
=
htonl
(
pRsp
->
connId
);
...
...
@@ -270,7 +273,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
code
=
0
};
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
rpcSendRequest
(
pObj
->
pRpcObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -292,8 +296,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"%p DB connection is closed, cmd:%d pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pObj
,
pObj
->
signature
);
taosRemoveRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
taosRemoveRef
(
tscObjRef
,
handle
);
taosReleaseRef
(
tscObjRef
,
handle
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -303,8 +307,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
taosRemoveRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
taosRemoveRef
(
tscObjRef
,
handle
);
taosReleaseRef
(
tscObjRef
,
handle
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -350,7 +354,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// if there is an error occurring, proceed to the following error handling procedure.
if
(
rpcMsg
->
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
handle
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -418,13 +422,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
rpcMsg
->
code
);
}
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
if
(
shouldFree
)
{
// in case of table-meta/vgrouplist query, automatically free it
taosRemoveRef
(
tscObjRef
,
pSql
->
self
);
taosRemoveRef
(
tscObjRef
,
handle
);
tscDebug
(
"%p sqlObj is automatically freed"
,
pSql
);
}
taosReleaseRef
(
tscObjRef
,
handle
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
...
...
src/client/src/tscSql.c
浏览文件 @
9d2305f2
...
...
@@ -90,9 +90,11 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
}
else
{
if
(
tscSetMgmtEpSetFromCfg
(
tsFirst
,
tsSecond
,
&
corMgmtEpSet
)
<
0
)
return
NULL
;
}
char
rpcKey
[
512
]
=
{
0
};
snprintf
(
rpcKey
,
sizeof
(
rpcKey
),
"%s:%s:%s:%d"
,
user
,
pass
,
ip
,
port
);
void
*
p
DnodeConn
=
NULL
;
if
(
tsc
InitRpc
(
user
,
secretEncrypt
,
&
pDnodeConn
)
!=
0
)
{
void
*
p
RpcObj
=
NULL
;
if
(
tsc
AcquireRpc
(
rpcKey
,
user
,
secretEncrypt
,
&
pRpcObj
)
!=
0
)
{
terrno
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
NULL
;
}
...
...
@@ -100,23 +102,21 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
STscObj
*
pObj
=
(
STscObj
*
)
calloc
(
1
,
sizeof
(
STscObj
));
if
(
NULL
==
pObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
tscReleaseRpc
(
pRpcObj
);
return
NULL
;
}
// set up tscObj's mgmtEpSet
pObj
->
tscCorMgmtEpSet
=
(
SRpcCorEpSet
*
)
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
NULL
==
pObj
->
tscCorMgmtEpSet
)
{
pObj
->
tscCorMgmtEpSet
=
malloc
(
sizeof
(
SRpcCorEpSet
));
if
(
pObj
->
tscCorMgmtEpSet
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
memcpy
(
pObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
SRpcCorEpSet
));
pObj
->
signature
=
pObj
;
pObj
->
pDnodeConn
=
pDnodeConn
;
memcpy
(
pObj
->
tscCorMgmtEpSet
,
&
corMgmtEpSet
,
sizeof
(
corMgmtEpSet
));
pObj
->
signature
=
pObj
;
pObj
->
pRpcObj
=
(
SRpcObj
*
)
pRpcObj
;
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
secretEncryptLen
=
MIN
(
secretEncryptLen
,
sizeof
(
pObj
->
pass
));
memcpy
(
pObj
->
pass
,
secretEncrypt
,
secretEncryptLen
);
...
...
@@ -126,8 +126,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
/* db name is too long */
if
(
len
>=
TSDB_DB_NAME_LEN
)
{
terrno
=
TSDB_CODE_TSC_INVALID_DB_LENGTH
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -144,8 +143,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
NULL
==
pSql
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
free
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pRpcObj
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -161,9 +159,8 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
rpcClose
(
pDnodeConn
);
tscReleaseRpc
(
pRpcObj
);
free
(
pSql
);
free
(
pObj
->
tscCorMgmtEpSet
);
free
(
pObj
);
return
NULL
;
}
...
...
@@ -202,7 +199,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return
NULL
;
}
tscDebug
(
"%p DB connection is opening,
dnodeConn:%p"
,
pObj
,
p
Obj
->
pDnodeConn
);
tscDebug
(
"%p DB connection is opening,
rpcObj: %p, dnodeConn:%p"
,
pObj
,
pObj
->
pRpcObj
,
pObj
->
pRpc
Obj
->
pDnodeConn
);
taos_free_result
(
pSql
);
// version compare only requires the first 3 segments of the version string
...
...
@@ -279,7 +276,7 @@ void taos_close(TAOS *taos) {
return
;
}
tscDebug
(
"%p try to free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p try to free tscObj
"
,
pObj
);
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"%p already closed or invalid tscObj"
,
pObj
);
return
;
...
...
@@ -303,7 +300,7 @@ void taos_close(TAOS *taos) {
}
}
tscDebug
(
"%p all sqlObj are freed, free tscObj
and close dnodeConn:%p"
,
pObj
,
pObj
->
pDnodeConn
);
tscDebug
(
"%p all sqlObj are freed, free tscObj
"
,
pObj
);
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
}
...
...
src/client/src/tscSystem.c
浏览文件 @
9d2305f2
...
...
@@ -43,41 +43,74 @@ void *tscTmr;
void
*
tscQhandle
;
int32_t
tscRefId
=
-
1
;
int32_t
tscNumOfObj
=
0
;
// number of sqlObj in current process.
static
void
*
tscCheckDiskUsageTmr
;
void
*
tscRpcCache
;
// cache to keep rpc obj
int32_t
tscNumOfThreads
=
1
;
// num of rpc threads
static
pthread_mutex_t
rpcObjMutex
;
// mutex to protect open the rpc obj concurrently
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
void
tscCheckDiskUsage
(
void
*
UNUSED_PARAM
(
para
),
void
*
UNUSED_PARAM
(
param
))
{
taosGetDisk
();
taosTmrReset
(
tscCheckDiskUsage
,
1000
,
NULL
,
tscTmr
,
&
tscCheckDiskUsageTmr
);
}
void
tscFreeRpcObj
(
void
*
param
)
{
assert
(
param
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)(
param
);
tscDebug
(
"free rpcObj:%p and free pDnodeConn: %p"
,
pRpcObj
,
pRpcObj
->
pDnodeConn
);
rpcClose
(
pRpcObj
->
pDnodeConn
);
}
int32_t
tscInitRpc
(
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
pDnodeConn
)
{
SRpcInit
rpcInit
;
if
(
*
pDnodeConn
==
NULL
)
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
1
;
// every DB connection has only one thread
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
idleTime
=
2000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
*
pDnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to TDengine"
);
return
-
1
;
}
else
{
tscDebug
(
"dnodeConn:%p is created, user:%s"
,
*
pDnodeConn
,
user
);
}
void
tscReleaseRpc
(
void
*
param
)
{
if
(
param
==
NULL
)
{
return
;
}
pthread_mutex_lock
(
&
rpcObjMutex
);
taosCacheRelease
(
tscRpcCache
,
(
void
*
)
&
param
,
false
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
}
int32_t
tscAcquireRpc
(
const
char
*
key
,
const
char
*
user
,
const
char
*
secretEncrypt
,
void
**
ppRpcObj
)
{
pthread_mutex_lock
(
&
rpcObjMutex
);
SRpcObj
*
pRpcObj
=
(
SRpcObj
*
)
taosCacheAcquireByKey
(
tscRpcCache
,
key
,
strlen
(
key
));
if
(
pRpcObj
!=
NULL
)
{
*
ppRpcObj
=
pRpcObj
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
}
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
tscNumOfThreads
*
2
;
rpcInit
.
cfp
=
tscProcessMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
idleTime
=
2000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
SRpcObj
rpcObj
;
memset
(
&
rpcObj
,
0
,
sizeof
(
rpcObj
));
strncpy
(
rpcObj
.
key
,
key
,
strlen
(
key
));
rpcObj
.
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
rpcObj
.
pDnodeConn
==
NULL
)
{
pthread_mutex_unlock
(
&
rpcObjMutex
);
tscError
(
"failed to init connection to TDengine"
);
return
-
1
;
}
pRpcObj
=
taosCachePut
(
tscRpcCache
,
rpcObj
.
key
,
strlen
(
rpcObj
.
key
),
&
rpcObj
,
sizeof
(
rpcObj
),
1000
*
10
);
if
(
pRpcObj
==
NULL
)
{
rpcClose
(
rpcObj
.
pDnodeConn
);
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
-
1
;
}
*
ppRpcObj
=
pRpcObj
;
pthread_mutex_unlock
(
&
rpcObjMutex
);
return
0
;
}
...
...
@@ -118,7 +151,7 @@ void taos_init_imp(void) {
int
queueSize
=
tsMaxConnections
*
2
;
double
factor
=
(
tscEmbedded
==
0
)
?
2
.
0
:
4
.
0
;
int32_t
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
if
(
tscNumOfThreads
<
2
)
{
tscNumOfThreads
=
2
;
}
...
...
@@ -140,6 +173,10 @@ void taos_init_imp(void) {
tscTableMetaInfo
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tscDebug
(
"TableMeta:%p"
,
tscTableMetaInfo
);
}
int
refreshTime
=
5
;
tscRpcCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
true
,
tscFreeRpcObj
,
"rpcObj"
);
pthread_mutex_init
(
&
rpcObjMutex
,
NULL
);
tscRefId
=
taosOpenRef
(
200
,
tscCloseTscObj
);
...
...
@@ -181,10 +218,16 @@ void taos_cleanup(void) {
taosCleanupKeywordsTable
();
taosCloseLog
();
if
(
tscEmbedded
==
0
)
{
rpcCleanup
();
p
=
tscRpcCache
;
tscRpcCache
=
NULL
;
if
(
p
!=
NULL
)
{
taosCacheCleanup
(
p
);
pthread_mutex_destroy
(
&
rpcObjMutex
);
}
if
(
tscEmbedded
==
0
)
rpcCleanup
();
p
=
tscTmr
;
tscTmr
=
NULL
;
taosTmrCleanUp
(
p
);
...
...
src/client/src/tscUtil.c
浏览文件 @
9d2305f2
...
...
@@ -32,6 +32,14 @@
static
void
freeQueryInfoImpl
(
SQueryInfo
*
pQueryInfo
);
static
void
clearAllTableMetaInfo
(
SQueryInfo
*
pQueryInfo
);
static
void
tscStrToLower
(
char
*
str
,
int32_t
n
)
{
if
(
str
==
NULL
||
n
<=
0
)
{
return
;}
for
(
int32_t
i
=
0
;
i
<
n
;
i
++
)
{
if
(
str
[
i
]
>=
'A'
&&
str
[
i
]
<=
'Z'
)
{
str
[
i
]
-=
(
'A'
-
'a'
);
}
}
}
SCond
*
tsGetSTableQueryCond
(
STagCond
*
pTagCond
,
uint64_t
uid
)
{
if
(
pTagCond
->
pCond
==
NULL
)
{
return
NULL
;
...
...
@@ -442,7 +450,6 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj
*
p
=
*
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
p
->
pTscObj
;
assert
(
RID_VALID
(
p
->
self
));
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfObj
,
1
);
...
...
@@ -893,16 +900,10 @@ void tscCloseTscObj(void *param) {
pObj
->
signature
=
NULL
;
taosTmrStopA
(
&
(
pObj
->
pTimer
));
void
*
p
=
pObj
->
pDnodeConn
;
if
(
pObj
->
pDnodeConn
!=
NULL
)
{
rpcClose
(
pObj
->
pDnodeConn
);
pObj
->
pDnodeConn
=
NULL
;
}
tfree
(
pObj
->
tscCorMgmtEpSet
);
tscReleaseRpc
(
pObj
->
pRpcObj
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
tscDebug
(
"%p DB connection is closed, dnodeConn:%p"
,
pObj
,
p
);
tfree
(
pObj
);
}
...
...
@@ -1431,9 +1432,11 @@ int32_t tscValidateName(SStrToken* pToken) {
char
*
sep
=
strnchr
(
pToken
->
z
,
TS_PATH_DELIMITER
[
0
],
pToken
->
n
,
true
);
if
(
sep
==
NULL
)
{
// single part
if
(
pToken
->
type
==
TK_STRING
)
{
strdequote
(
pToken
->
z
);
tscStrToLower
(
pToken
->
z
,
pToken
->
n
);
pToken
->
n
=
(
uint32_t
)
strtrim
(
pToken
->
z
);
int
len
=
tSQLGetToken
(
pToken
->
z
,
&
pToken
->
type
);
// single token, validate it
...
...
@@ -1485,7 +1488,7 @@ int32_t tscValidateName(SStrToken* pToken) {
if
(
pToken
->
type
==
TK_STRING
&&
validateQuoteToken
(
pToken
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// re-build the whole name string
if
(
pStr
[
firstPartLen
]
==
TS_PATH_DELIMITER
[
0
])
{
// first part do not have quote do nothing
...
...
@@ -1497,6 +1500,8 @@ int32_t tscValidateName(SStrToken* pToken) {
}
pToken
->
n
+=
(
firstPartLen
+
sizeof
(
TS_PATH_DELIMITER
[
0
]));
pToken
->
z
=
pStr
;
tscStrToLower
(
pToken
->
z
,
pToken
->
n
);
}
return
TSDB_CODE_SUCCESS
;
...
...
src/common/inc/tglobal.h
浏览文件 @
9d2305f2
...
...
@@ -57,7 +57,8 @@ extern int32_t tsCompressMsgSize;
extern
char
tsTempDir
[];
//query buffer management
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer for each data node during query processing
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer size in MB for each data node during query processing
extern
int64_t
tsQueryBufferSizeBytes
;
// maximum allowed usage buffer size in byte for each data node during query processing
extern
int32_t
tsRetrieveBlockingModel
;
// retrieve threads will be blocked
extern
int8_t
tsKeepOriginalColumnName
;
...
...
src/common/src/tglobal.c
浏览文件 @
9d2305f2
...
...
@@ -105,6 +105,7 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// 0 no query allowed, queries are disabled
// positive value (in MB)
int32_t
tsQueryBufferSize
=
-
1
;
int64_t
tsQueryBufferSizeBytes
=
-
1
;
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
int32_t
tsRetrieveBlockingModel
=
0
;
...
...
@@ -283,7 +284,7 @@ bool taosCfgDynamicOptions(char *msg) {
int32_t
cfgLen
=
(
int32_t
)
strlen
(
cfg
->
option
);
if
(
cfgLen
!=
olen
)
continue
;
if
(
strncasecmp
(
option
,
cfg
->
option
,
olen
)
!=
0
)
continue
;
if
(
cfg
->
valType
!
=
TAOS_CFG_VTYPE_INT32
)
{
if
(
cfg
->
valType
=
=
TAOS_CFG_VTYPE_INT32
)
{
*
((
int32_t
*
)
cfg
->
ptr
)
=
vint
;
}
else
{
*
((
int8_t
*
)
cfg
->
ptr
)
=
(
int8_t
)
vint
;
...
...
@@ -1488,6 +1489,10 @@ int32_t taosCheckGlobalCfg() {
tsSyncPort
=
tsServerPort
+
TSDB_PORT_SYNC
;
tsHttpPort
=
tsServerPort
+
TSDB_PORT_HTTP
;
if
(
tsQueryBufferSize
>=
0
)
{
tsQueryBufferSizeBytes
=
tsQueryBufferSize
*
1048576UL
;
}
taosPrintGlobalCfg
();
return
0
;
...
...
src/common/src/ttypes.c
浏览文件 @
9d2305f2
...
...
@@ -632,7 +632,7 @@ int32_t tStrToInteger(const char* z, int16_t type, int32_t n, int64_t* value, bo
}
// the string may be overflow according to errno
*
value
=
issigned
?
strtoll
(
z
,
&
endPtr
,
radix
)
:
strtoul
(
z
,
&
endPtr
,
radix
);
*
value
=
issigned
?
strtoll
(
z
,
&
endPtr
,
radix
)
:
strtoul
l
(
z
,
&
endPtr
,
radix
);
// not a valid integer number, return error
if
(
endPtr
-
z
!=
n
||
errno
==
ERANGE
)
{
...
...
src/cq/src/cqMain.c
浏览文件 @
9d2305f2
...
...
@@ -50,10 +50,13 @@ typedef struct {
void
*
dbConn
;
void
*
tmrCtrl
;
pthread_mutex_t
mutex
;
int32_t
delete
;
int32_t
cqObjNum
;
}
SCqContext
;
typedef
struct
SCqObj
{
tmr_h
tmrId
;
int64_t
rid
;
uint64_t
uid
;
int32_t
tid
;
// table ID
int32_t
rowSize
;
// bytes of a row
...
...
@@ -69,6 +72,84 @@ typedef struct SCqObj {
static
void
cqProcessStreamRes
(
void
*
param
,
TAOS_RES
*
tres
,
TAOS_ROW
row
);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
);
int32_t
cqObjRef
=
-
1
;
void
cqRmFromList
(
SCqObj
*
pObj
)
{
//LOCK in caller
SCqContext
*
pContext
=
pObj
->
pContext
;
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pContext
->
pHead
=
pObj
->
next
;
}
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
}
}
void
cqFree
(
void
*
handle
)
{
if
(
tsEnableStream
==
0
)
{
return
;
}
SCqObj
*
pObj
=
handle
;
SCqContext
*
pContext
=
pObj
->
pContext
;
int32_t
delete
=
0
;
pthread_mutex_lock
(
&
pContext
->
mutex
);
// free the resources associated
if
(
pObj
->
pStream
)
{
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
}
else
{
taosTmrStop
(
pObj
->
tmrId
);
pObj
->
tmrId
=
0
;
}
cInfo
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
tdFreeSchema
(
pObj
->
pSchema
);
free
(
pObj
->
dstTable
);
free
(
pObj
->
sqlStr
);
free
(
pObj
);
pContext
->
cqObjNum
--
;
if
(
pContext
->
cqObjNum
<=
0
&&
pContext
->
delete
)
{
delete
=
1
;
}
pthread_mutex_unlock
(
&
pContext
->
mutex
);
if
(
delete
)
{
pthread_mutex_unlock
(
&
pContext
->
mutex
);
pthread_mutex_destroy
(
&
pContext
->
mutex
);
taosTmrCleanUp
(
pContext
->
tmrCtrl
);
pContext
->
tmrCtrl
=
NULL
;
cDebug
(
"vgId:%d, CQ is closed"
,
pContext
->
vgId
);
free
(
pContext
);
}
}
void
cqCreateRef
()
{
int32_t
ref
=
atomic_load_32
(
&
cqObjRef
);
if
(
ref
==
-
1
)
{
ref
=
taosOpenRef
(
4096
,
cqFree
);
if
(
atomic_val_compare_exchange_32
(
&
cqObjRef
,
-
1
,
ref
)
!=
-
1
)
{
taosCloseRef
(
ref
);
}
}
}
void
*
cqOpen
(
void
*
ahandle
,
const
SCqCfg
*
pCfg
)
{
if
(
tsEnableStream
==
0
)
{
return
NULL
;
...
...
@@ -79,6 +160,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return
NULL
;
}
cqCreateRef
();
pContext
->
tmrCtrl
=
taosTmrInit
(
0
,
0
,
0
,
"CQ"
);
tstrncpy
(
pContext
->
user
,
pCfg
->
user
,
sizeof
(
pContext
->
user
));
...
...
@@ -97,6 +180,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pthread_mutex_init
(
&
pContext
->
mutex
,
NULL
);
cDebug
(
"vgId:%d, CQ is opened"
,
pContext
->
vgId
);
return
pContext
;
...
...
@@ -109,30 +193,30 @@ void cqClose(void *handle) {
SCqContext
*
pContext
=
handle
;
if
(
handle
==
NULL
)
return
;
pContext
->
delete
=
1
;
// stop all CQs
cqStop
(
pContext
);
// free all resources
pthread_mutex_lock
(
&
pContext
->
mutex
);
SCqObj
*
pObj
=
pContext
->
pHead
;
while
(
pObj
)
{
SCqObj
*
pTemp
=
pObj
;
pObj
=
pObj
->
next
;
tdFreeSchema
(
pTemp
->
pSchema
);
tfree
(
pTemp
->
sqlStr
);
free
(
pTemp
);
}
pthread_mutex_unlock
(
&
pContext
->
mutex
);
int64_t
rid
=
0
;
pthread_mutex_destroy
(
&
pContext
->
mutex
);
while
(
1
)
{
pthread_mutex_lock
(
&
pContext
->
mutex
);
taosTmrCleanUp
(
pContext
->
tmrCtrl
);
pContext
->
tmrCtrl
=
NULL
;
SCqObj
*
pObj
=
pContext
->
pHead
;
if
(
pObj
)
{
cqRmFromList
(
pObj
);
cDebug
(
"vgId:%d, CQ is closed"
,
pContext
->
vgId
);
free
(
pContext
);
rid
=
pObj
->
rid
;
}
else
{
pthread_mutex_unlock
(
&
pContext
->
mutex
);
break
;
}
pthread_mutex_unlock
(
&
pContext
->
mutex
);
taosRemoveRef
(
cqObjRef
,
rid
);
}
}
void
cqStart
(
void
*
handle
)
{
...
...
@@ -191,7 +275,8 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
return
NULL
;
}
SCqContext
*
pContext
=
handle
;
int64_t
rid
=
0
;
SCqObj
*
pObj
=
calloc
(
sizeof
(
SCqObj
),
1
);
if
(
pObj
==
NULL
)
return
NULL
;
...
...
@@ -213,32 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
if
(
pContext
->
pHead
)
pContext
->
pHead
->
prev
=
pObj
;
pContext
->
pHead
=
pObj
;
pContext
->
cqObjNum
++
;
pObj
->
rid
=
taosAddRef
(
cqObjRef
,
pObj
);
cqCreateStream
(
pContext
,
pObj
);
rid
=
pObj
->
rid
;
pthread_mutex_unlock
(
&
pContext
->
mutex
);
return
pObj
;
return
(
void
*
)
rid
;
}
void
cqDrop
(
void
*
handle
)
{
if
(
tsEnableStream
==
0
)
{
return
;
}
SCqObj
*
pObj
=
handle
;
SCqContext
*
pContext
=
pObj
->
pContext
;
pthread_mutex_lock
(
&
pContext
->
mutex
);
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pContext
->
pHead
=
pObj
->
next
;
}
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
SCqObj
*
pObj
=
(
SCqObj
*
)
taosAcquireRef
(
cqObjRef
,
(
int64_t
)
handle
);
if
(
pObj
==
NULL
)
{
return
;
}
SCqContext
*
pContext
=
pObj
->
pContext
;
pthread_mutex_lock
(
&
pContext
->
mutex
);
cqRmFromList
(
pObj
);
// free the resources associated
if
(
pObj
->
pStream
)
{
taos_close_stream
(
pObj
->
pStream
);
...
...
@@ -248,17 +337,18 @@ void cqDrop(void *handle) {
pObj
->
tmrId
=
0
;
}
cInfo
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
tdFreeSchema
(
pObj
->
pSchema
);
free
(
pObj
->
dstTable
);
free
(
pObj
->
sqlStr
);
free
(
pObj
);
pthread_mutex_unlock
(
&
pContext
->
mutex
);
taosRemoveRef
(
cqObjRef
,
(
int64_t
)
handle
);
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
handle
);
}
static
void
doCreateStream
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
SCqObj
*
pObj
=
(
SCqObj
*
)
param
;
SCqObj
*
pObj
=
(
SCqObj
*
)
taosAcquireRef
(
cqObjRef
,
(
int64_t
)
param
);
if
(
pObj
==
NULL
)
{
return
;
}
SCqContext
*
pContext
=
pObj
->
pContext
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
result
;
if
(
atomic_val_compare_exchange_ptr
(
&
(
pContext
->
dbConn
),
NULL
,
pSql
->
pTscObj
)
!=
NULL
)
{
...
...
@@ -267,10 +357,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
pthread_mutex_lock
(
&
pContext
->
mutex
);
cqCreateStream
(
pContext
,
pObj
);
pthread_mutex_unlock
(
&
pContext
->
mutex
);
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
}
static
void
cqProcessCreateTimer
(
void
*
param
,
void
*
tmrId
)
{
SCqObj
*
pObj
=
(
SCqObj
*
)
param
;
SCqObj
*
pObj
=
(
SCqObj
*
)
taosAcquireRef
(
cqObjRef
,
(
int64_t
)
param
);
if
(
pObj
==
NULL
)
{
return
;
}
SCqContext
*
pContext
=
pObj
->
pContext
;
if
(
pContext
->
dbConn
==
NULL
)
{
...
...
@@ -281,6 +377,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
cqCreateStream
(
pContext
,
pObj
);
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
}
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
...
...
@@ -288,13 +386,13 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if
(
pContext
->
dbConn
==
NULL
)
{
cDebug
(
"vgId:%d, create dbConn after 1000 ms"
,
pContext
->
vgId
);
pObj
->
tmrId
=
taosTmrStart
(
cqProcessCreateTimer
,
1000
,
pObj
,
pContext
->
tmrCtrl
);
pObj
->
tmrId
=
taosTmrStart
(
cqProcessCreateTimer
,
1000
,
(
void
*
)
pObj
->
rid
,
pContext
->
tmrCtrl
);
return
;
}
pObj
->
tmrId
=
0
;
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
0
,
pObj
,
NULL
);
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
0
,
(
void
*
)
pObj
->
rid
,
NULL
);
// TODO the pObj->pStream may be released if error happens
if
(
pObj
->
pStream
)
{
...
...
@@ -308,18 +406,28 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
static
void
cqProcessStreamRes
(
void
*
param
,
TAOS_RES
*
tres
,
TAOS_ROW
row
)
{
SCqObj
*
pObj
=
(
SCqObj
*
)
param
;
SCqObj
*
pObj
=
(
SCqObj
*
)
taosAcquireRef
(
cqObjRef
,
(
int64_t
)
param
);
if
(
pObj
==
NULL
)
{
return
;
}
if
(
tres
==
NULL
&&
row
==
NULL
)
{
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
return
;
}
SCqContext
*
pContext
=
pObj
->
pContext
;
STSchema
*
pSchema
=
pObj
->
pSchema
;
if
(
pObj
->
pStream
==
NULL
)
return
;
if
(
pObj
->
pStream
==
NULL
)
{
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
return
;
}
cDebug
(
"vgId:%d, id:%d CQ:%s stream result is ready"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
int32_t
size
=
sizeof
(
SWalHead
)
+
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
TD_DATA_ROW_HEAD_SIZE
+
pObj
->
rowSize
;
...
...
@@ -370,5 +478,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
// write into vnode write queue
pContext
->
cqWrite
(
pContext
->
vgId
,
pHead
,
TAOS_QTYPE_CQ
,
NULL
);
free
(
buffer
);
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
}
src/dnode/src/dnodeCfg.c
浏览文件 @
9d2305f2
...
...
@@ -156,7 +156,7 @@ static int32_t dnodeWriteCfg() {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
...
...
src/dnode/src/dnodeEps.c
浏览文件 @
9d2305f2
...
...
@@ -277,7 +277,7 @@ static int32_t dnodeWriteEps() {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
...
...
src/dnode/src/dnodeMInfos.c
浏览文件 @
9d2305f2
...
...
@@ -286,7 +286,7 @@ static int32_t dnodeWriteMInfos() {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
...
...
src/mnode/src/mnodeUser.c
浏览文件 @
9d2305f2
...
...
@@ -123,7 +123,7 @@ static void mnodePrintUserAuth() {
mnodeDecUserRef
(
pUser
);
}
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
}
...
...
src/query/src/qExecutor.c
浏览文件 @
9d2305f2
...
...
@@ -7832,15 +7832,15 @@ static int64_t getQuerySupportBufSize(size_t numOfTables) {
int32_t
checkForQueryBuf
(
size_t
numOfTables
)
{
int64_t
t
=
getQuerySupportBufSize
(
numOfTables
);
if
(
tsQueryBufferSize
<
0
)
{
if
(
tsQueryBufferSize
Bytes
<
0
)
{
return
TSDB_CODE_SUCCESS
;
}
else
if
(
tsQueryBufferSize
>
0
)
{
}
else
if
(
tsQueryBufferSize
Bytes
>
0
)
{
while
(
1
)
{
int64_t
s
=
tsQueryBufferSize
;
int64_t
s
=
tsQueryBufferSize
Bytes
;
int64_t
remain
=
s
-
t
;
if
(
remain
>=
0
)
{
if
(
atomic_val_compare_exchange_64
(
&
tsQueryBufferSize
,
s
,
remain
)
==
s
)
{
if
(
atomic_val_compare_exchange_64
(
&
tsQueryBufferSize
Bytes
,
s
,
remain
)
==
s
)
{
return
TSDB_CODE_SUCCESS
;
}
}
else
{
...
...
@@ -7854,14 +7854,14 @@ int32_t checkForQueryBuf(size_t numOfTables) {
}
void
releaseQueryBuf
(
size_t
numOfTables
)
{
if
(
tsQueryBufferSize
<=
0
)
{
if
(
tsQueryBufferSize
Bytes
<
0
)
{
return
;
}
int64_t
t
=
getQuerySupportBufSize
(
numOfTables
);
// restore value is not enough buffer available
atomic_add_fetch_64
(
&
tsQueryBufferSize
,
t
);
atomic_add_fetch_64
(
&
tsQueryBufferSize
Bytes
,
t
);
}
void
*
qGetResultRetrieveMsg
(
qinfo_t
qinfo
)
{
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
9d2305f2
...
...
@@ -48,6 +48,13 @@ typedef struct SThreadObj {
void
*
(
*
processData
)(
SRecvInfo
*
pPacket
);
}
SThreadObj
;
typedef
struct
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
}
SClientObj
;
typedef
struct
{
SOCKET
fd
;
uint32_t
ip
;
...
...
@@ -116,6 +123,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj
->
processData
=
fp
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
stop
=
false
;
}
// initialize mutex, thread, fd which may fail
...
...
@@ -166,6 +174,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
static
void
taosStopTcpThread
(
SThreadObj
*
pThreadObj
)
{
if
(
pThreadObj
==
NULL
)
{
return
;}
// save thread into local variable and signal thread to stop
pthread_t
thread
=
pThreadObj
->
thread
;
if
(
!
taosCheckPthreadValid
(
thread
))
{
...
...
@@ -282,49 +291,76 @@ static void *taosAcceptTcpConnection(void *arg) {
return
NULL
;
}
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
)
{
SThreadObj
*
pThreadObj
;
pthread_attr_t
thattr
;
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
));
memset
(
pThreadObj
,
0
,
sizeof
(
SThreadObj
));
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
ip
=
ip
;
pThreadObj
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP client mutex(%s)"
,
label
,
strerror
(
errno
));
free
(
pThreadObj
);
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
pClientObj
=
(
SClientObj
*
)
calloc
(
1
,
sizeof
(
SClientObj
));
if
(
pClientObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
}
pThreadObj
->
pollFd
=
(
EpollFd
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP client epoll"
,
label
);
free
(
pThreadObj
);
tstrncpy
(
pClientObj
->
label
,
label
,
sizeof
(
pClientObj
->
label
));
pClientObj
->
numOfThreads
=
numOfThreads
;
pClientObj
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
numOfThreads
,
sizeof
(
SThreadObj
*
));
if
(
pClientObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
tfree
(
pClientObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pThreadObj
->
processData
=
fp
;
int
code
=
0
;
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
int
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
EpollClose
(
pThreadObj
->
pollFd
);
pThreadObj
->
pollFd
=
-
1
;
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP read data thread(%s)"
,
label
,
strerror
(
errno
));
return
NULL
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
if
(
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
for
(
int
j
=
0
;
j
<
i
;
++
j
)
free
(
pClientObj
->
pThreadObj
[
j
]);
free
(
pClientObj
);
pthread_attr_destroy
(
&
thattr
);
return
NULL
;
}
pClientObj
->
pThreadObj
[
i
]
=
pThreadObj
;
taosResetPthread
(
&
pThreadObj
->
thread
);
pThreadObj
->
ip
=
ip
;
pThreadObj
->
stop
=
false
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
->
processData
=
fp
;
}
tDebug
(
"%s TCP client is initialized, ip:%u:%hu"
,
label
,
ip
,
port
);
// initialize mutex, thread, fd which may fail
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
];
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
if
(
code
<
0
)
{
tError
(
"%s failed to init TCP process data mutex(%s)"
,
label
,
strerror
(
errno
));
break
;
}
return
pThreadObj
;
pThreadObj
->
pollFd
=
(
int64_t
)
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
code
=
-
1
;
break
;
}
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
if
(
code
!=
0
)
{
tError
(
"%s failed to create TCP process data thread(%s)"
,
label
,
strerror
(
errno
));
break
;
}
pThreadObj
->
threadId
=
i
;
}
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosCleanUpTcpClient
(
pClientObj
);
pClientObj
=
NULL
;
}
return
pClientObj
;
}
void
taosStopTcpClient
(
void
*
chandle
)
{
...
...
@@ -335,15 +371,23 @@ void taosStopTcpClient(void *chandle) {
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
tDebug
(
"%s TCP client will be cleaned up"
,
pThreadObj
->
label
);
taosStopTcpThread
(
pThreadObj
);
SClientObj
*
pClientObj
=
chandle
;
if
(
pClientObj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pClientObj
->
numOfThreads
;
++
i
)
{
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
i
];
taosStopTcpThread
(
pThreadObj
);
}
tDebug
(
"%s TCP client is cleaned up"
,
pClientObj
->
label
);
tfree
(
pClientObj
->
pThreadObj
);
tfree
(
pClientObj
);
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
SThreadObj
*
pThreadObj
=
shandle
;
SClientObj
*
pClientObj
=
shandle
;
int32_t
index
=
atomic_load_32
(
&
pClientObj
->
index
)
%
pClientObj
->
numOfThreads
;
atomic_store_32
(
&
pClientObj
->
index
,
index
+
1
);
SThreadObj
*
pThreadObj
=
pClientObj
->
pThreadObj
[
index
];
SOCKET
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pThreadObj
->
ip
);
if
(
fd
<=
0
)
return
NULL
;
...
...
src/vnode/src/vnodeCfg.c
浏览文件 @
9d2305f2
...
...
@@ -341,7 +341,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
...
...
src/vnode/src/vnodeVersion.c
浏览文件 @
9d2305f2
...
...
@@ -90,7 +90,7 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
f
flush
(
fp
);
f
sync
(
fileno
(
fp
)
);
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/pom.xml
浏览文件 @
9d2305f2
...
...
@@ -47,7 +47,7 @@
<dependency>
<groupId>
com.taosdata.jdbc
</groupId>
<artifactId>
taos-jdbcdriver
</artifactId>
<version>
2.0.
4
</version>
<version>
2.0.
18
</version>
</dependency>
</dependencies>
...
...
@@ -69,7 +69,7 @@
<configuration>
<archive>
<manifest>
<mainClass>
com.taosdata.
jdbc.
example.jdbcTemplate.App
</mainClass>
<mainClass>
com.taosdata.example.jdbcTemplate.App
</mainClass>
</manifest>
</archive>
<descriptorRefs>
...
...
tests/examples/JDBC/SpringJdbcTemplate/readme.md
浏览文件 @
9d2305f2
...
...
@@ -8,18 +8,16 @@
修改
`src/main/resources/applicationContext.xml`
文件中 TDengine 的配置信息:
```
xml
<bean
id=
"dataSource"
class=
"org.springframework.jdbc.datasource.DriverManagerDataSource"
>
<property
name=
"driverClassName"
value=
"com.taosdata.jdbc.TSDBDriver"
></property>
<property
name=
"url"
value=
"jdbc:TAOS://127.0.0.1:6030/log"
></property>
<property
name=
"username"
value=
"root"
></property>
<property
name=
"password"
value=
"taosdata"
></property>
</bean>
<bean
id =
"jdbcTemplate"
class=
"org.springframework.jdbc.core.JdbcTemplate"
>
<property
name=
"dataSource"
ref =
"dataSource"
></property>
</bean>
<bean
id=
"dataSource"
class=
"org.springframework.jdbc.datasource.DriverManagerDataSource"
>
<property
name=
"driverClassName"
value=
"com.taosdata.jdbc.TSDBDriver"
></property>
<property
name=
"url"
value=
"jdbc:TAOS://127.0.0.1:6030/log"
></property>
<property
name=
"username"
value=
"root"
></property>
<property
name=
"password"
value=
"taosdata"
></property>
</bean>
<bean
id =
"jdbcTemplate"
class=
"org.springframework.jdbc.core.JdbcTemplate"
>
<property
name=
"dataSource"
ref =
"dataSource"
></property>
</bean>
```
### 打包运行
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/
example/jdbcTemplate/App.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/App.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.
example.jdbcTemplate
;
package
com.taosdata.example.jdbcTemplate
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.dao.ExecuteAsStatement
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.dao.WeatherDao
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.domain.Weather
;
import
com.taosdata.example.jdbcTemplate.dao.ExecuteAsStatement
;
import
com.taosdata.example.jdbcTemplate.dao.WeatherDao
;
import
com.taosdata.example.jdbcTemplate.domain.Weather
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.support.ClassPathXmlApplicationContext
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/
example/jdbcTemplate/dao/ExecuteAsStatement.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/ExecuteAsStatement.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.
example.jdbcTemplate.dao
;
package
com.taosdata.example.jdbcTemplate.dao
;
public
interface
ExecuteAsStatement
{
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/example/jdbcTemplate/dao/impl
/ExecuteAsStatementImpl.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
example/jdbcTemplate/dao
/ExecuteAsStatementImpl.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.example.jdbcTemplate.dao.impl
;
package
com.taosdata.
example.jdbcTemplate.dao
;
import
com.taosdata.jdbc.example.jdbcTemplate.dao.ExecuteAsStatement
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.jdbc.core.JdbcTemplate
;
import
org.springframework.stereotype.Repository
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/
example/jdbcTemplate/dao/WeatherDao.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/dao/WeatherDao.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.
example.jdbcTemplate.dao
;
package
com.taosdata.example.jdbcTemplate.dao
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.domain.Weather
;
import
com.taosdata.example.jdbcTemplate.domain.Weather
;
import
java.util.List
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/example/jdbcTemplate/dao/impl
/WeatherDaoImpl.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
example/jdbcTemplate/dao
/WeatherDaoImpl.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.example.jdbcTemplate.dao.impl
;
package
com.taosdata.
example.jdbcTemplate.dao
;
import
com.taosdata.
jdbc.example.jdbcTemplate.dao.WeatherDao
;
import
com.taosdata.
jdbc.example.jdbcTemplate.domain.Weather
;
import
com.taosdata.
example.jdbcTemplate.domain.Weather
;
import
com.taosdata.
example.jdbcTemplate.dao.WeatherDao
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.jdbc.core.BatchPreparedStatementSetter
;
import
org.springframework.jdbc.core.JdbcTemplate
;
import
org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils
;
import
org.springframework.jdbc.core.simple.SimpleJdbcInsert
;
import
org.springframework.stereotype.Repository
;
import
java.sql.PreparedStatement
;
import
java.sql.SQLException
;
import
java.sql.Timestamp
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
@Repository
public
class
WeatherDaoImpl
implements
WeatherDao
{
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/
jdbc/
example/jdbcTemplate/domain/Weather.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/example/jdbcTemplate/domain/Weather.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.
example.jdbcTemplate.domain
;
package
com.taosdata.example.jdbcTemplate.domain
;
import
java.sql.Timestamp
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/main/resources/applicationContext.xml
浏览文件 @
9d2305f2
...
...
@@ -10,7 +10,7 @@
<bean
id=
"dataSource"
class=
"org.springframework.jdbc.datasource.DriverManagerDataSource"
>
<property
name=
"driverClassName"
value=
"com.taosdata.jdbc.TSDBDriver"
></property>
<property
name=
"url"
value=
"jdbc:TAOS://1
92.168.236.137
:6030/"
></property>
<property
name=
"url"
value=
"jdbc:TAOS://1
27.0.0.1
:6030/"
></property>
<property
name=
"username"
value=
"root"
></property>
<property
name=
"password"
value=
"taosdata"
></property>
</bean>
...
...
@@ -20,6 +20,6 @@
<property
name=
"dataSource"
ref=
"dataSource"
></property>
</bean>
<context:component-scan
base-package=
"com.taosdata.
jdbc.
example.jdbcTemplate"
/>
<context:component-scan
base-package=
"com.taosdata.example.jdbcTemplate"
/>
</beans>
tests/examples/JDBC/SpringJdbcTemplate/src/test/java/com/taosdata/
jdbc/
example/jdbcTemplate/BatcherInsertTest.java
→
tests/examples/JDBC/SpringJdbcTemplate/src/test/java/com/taosdata/example/jdbcTemplate/BatcherInsertTest.java
浏览文件 @
9d2305f2
package
com.taosdata.
jdbc.
example.jdbcTemplate
;
package
com.taosdata.example.jdbcTemplate
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.dao.ExecuteAsStatement
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.dao.WeatherDao
;
import
com.taosdata.
jdbc.
example.jdbcTemplate.domain.Weather
;
import
com.taosdata.example.jdbcTemplate.dao.ExecuteAsStatement
;
import
com.taosdata.example.jdbcTemplate.dao.WeatherDao
;
import
com.taosdata.example.jdbcTemplate.domain.Weather
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
...
...
tests/examples/JDBC/SpringJdbcTemplate/src/test/java/com/taosdata/jdbc/AppTest.java
已删除
100644 → 0
浏览文件 @
4f959dbf
package
com.taosdata.jdbc
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
import
org.junit.Test
;
/**
* Unit test for simple App.
*/
public
class
AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public
void
shouldAnswerWithTrue
()
{
assertTrue
(
true
);
}
}
tests/examples/JDBC/readme.md
0 → 100644
浏览文件 @
9d2305f2
# TDengine examples
| No. | Name | Describe |
| :--: | :----------------: | ------------------------------------------------------------ |
| 1 | JDBCDemo | Example codes for JDBC-JNI, JDBC-RESTful, Subscribe |
| 2 | connectionPools | Example codes for HikariCP, Druid, dbcp, c3p0 connection pools |
| 3 | SpringJdbcTemplate | Example codes for spring jdbcTemplate |
| 4 | mybatisplus-demo | Example codes for mybatis |
| 5 | springbootdemo | Example codes for springboot |
| 6 | taosdemo | This is an internal tool for testing Our JDBC-JNI, JDBC-RESTful, RESTful interfaces |
more detail: https://www.taosdata.com/cn//documentation20/connector-java/
\ No newline at end of file
tests/script/general/parser/dbtbnameValidate.sim
浏览文件 @
9d2305f2
...
...
@@ -26,11 +26,11 @@ sql use ' XYZ '
sql drop database 'abc123'
sql drop database '_ab1234'
sql drop database 'ABC123'
sql
_error
drop database 'ABC123'
sql drop database '_ABC123'
sql drop database 'aABb123'
sql drop database ' xyz '
sql drop database ' XYZ '
sql
_error
drop database ' XYZ '
sql use abc
...
...
@@ -67,9 +67,9 @@ sql describe mt
sql describe sub_001
sql describe sub_dy_tbl
sql
_error
describe Dd
sql
_error
describe FF
sql
_error
describe gG
sql describe Dd
sql describe FF
sql describe gG
sql drop table abc.cc
sql drop table 'abc.Dd'
...
...
@@ -119,4 +119,4 @@ if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/test-all.sh
浏览文件 @
9d2305f2
...
...
@@ -155,7 +155,7 @@ if [ "$2" != "python" ]; then
elif
[
"
$1
"
==
"b1"
]
;
then
echo
"### run TSIM b1 test ###"
runSimCaseOneByOne jenkins/basic_1.txt
#
runSimCaseOneByOne jenkins/basic_4.txt
runSimCaseOneByOne jenkins/basic_4.txt
elif
[
"
$1
"
==
"b2"
]
;
then
echo
"### run TSIM b2 test ###"
runSimCaseOneByOne jenkins/basic_2.txt
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录