Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
王长生-AnH
TDengine
提交
4d83bb3e
T
TDengine
项目概览
王长生-AnH
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
5
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4d83bb3e
编写于
5月 18, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/2.0tsdb
上级
055aed43
f9056ac3
变更
34
隐藏空白更改
内联
并排
Showing
34 changed file
with
702 addition
and
449 deletion
+702
-449
.gitignore
.gitignore
+3
-0
.travis.yml
.travis.yml
+1
-1
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-1
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+1
-1
src/dnode/inc/dnodeMain.h
src/dnode/inc/dnodeMain.h
+30
-0
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+2
-102
src/dnode/src/dnodeSystem.c
src/dnode/src/dnodeSystem.c
+117
-0
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+1
-2
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+1
-0
src/inc/taosdef.h
src/inc/taosdef.h
+6
-6
src/inc/trpc.h
src/inc/trpc.h
+1
-0
src/mnode/inc/mgmtDef.h
src/mnode/inc/mgmtDef.h
+2
-1
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+2
-1
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+80
-57
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+2
-0
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+83
-51
src/rpc/inc/rpcHead.h
src/rpc/inc/rpcHead.h
+1
-0
src/rpc/src/rpcCache.c
src/rpc/src/rpcCache.c
+5
-5
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+68
-64
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+1
-1
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+1
-1
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-0
tests/pytest/insert/bool.py
tests/pytest/insert/bool.py
+8
-0
tests/pytest/smoketest.sh
tests/pytest/smoketest.sh
+2
-0
tests/pytest/table/boundary.py
tests/pytest/table/boundary.py
+161
-0
tests/pytest/table/column_name.py
tests/pytest/table/column_name.py
+19
-72
tests/pytest/table/column_num.py
tests/pytest/table/column_num.py
+1
-1
tests/pytest/table/tablename-boundary.py
tests/pytest/table/tablename-boundary.py
+1
-1
tests/pytest/util/cases.py
tests/pytest/util/cases.py
+1
-1
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+21
-24
tests/pytest/util/sql.py
tests/pytest/util/sql.py
+44
-24
tests/pytest/valgrind-test.sh
tests/pytest/valgrind-test.sh
+32
-30
未找到文件。
.gitignore
浏览文件 @
4d83bb3e
...
...
@@ -33,5 +33,8 @@ Target/
*.failed
*.sql
sim/
psim/
pysim/
*.out
*DS_Store
.travis.yml
浏览文件 @
4d83bb3e
...
...
@@ -50,7 +50,7 @@ matrix:
./test-all.sh $TRAVIS_EVENT_TYPE || travis_terminate $?
cd ${TRAVIS_BUILD_DIR}/tests/pytest
./valgrind-test.sh
-g
2>&1 > mem-error-out.txt
./valgrind-test.sh 2>&1 > mem-error-out.txt
sleep 1
# Color setting
...
...
src/client/inc/tscUtil.h
浏览文件 @
4d83bb3e
...
...
@@ -43,7 +43,7 @@ extern "C" {
typedef
struct
SParsedColElem
{
int16_t
colIndex
;
int16_t
offset
;
u
int16_t
offset
;
}
SParsedColElem
;
typedef
struct
SParsedDataColInfo
{
...
...
src/client/inc/tsclient.h
浏览文件 @
4d83bb3e
...
...
@@ -49,7 +49,7 @@ typedef struct STableComInfo {
uint8_t
numOfTags
;
uint8_t
precision
;
int16_t
numOfColumns
;
int
16
_t
rowSize
;
int
32
_t
rowSize
;
}
STableComInfo
;
typedef
struct
STableMeta
{
...
...
src/client/src/tscServer.c
浏览文件 @
4d83bb3e
...
...
@@ -235,7 +235,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
STscObj
*
pObj
=
pSql
->
pTscObj
;
tscTrace
(
"%p msg:%
p is received from server"
,
pSql
,
rpcMsg
->
pCont
);
tscTrace
(
"%p msg:%
s is received from server"
,
pSql
,
taosMsg
[
rpcMsg
->
msgType
]
);
if
(
pSql
->
freed
||
pObj
->
signature
!=
pObj
)
{
tscTrace
(
"%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p"
,
pSql
,
pSql
->
freed
,
...
...
src/dnode/inc/dnodeMain.h
0 → 100644
浏览文件 @
4d83bb3e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MAIN_H
#define TDENGINE_DNODE_MAIN_H
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
dnodeInitSystem
();
void
dnodeCleanUpSystem
();
#ifdef __cplusplus
}
#endif
#endif
src/dnode/src/dnodeMain.c
浏览文件 @
4d83bb3e
...
...
@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
...
...
@@ -29,112 +27,14 @@
#include "dnodeVRead.h"
#include "dnodeShell.h"
#include "dnodeVWrite.h"
#include "tgrant.h"
static
int32_t
dnodeInitSystem
();
static
int32_t
dnodeInitStorage
();
extern
void
grantParseParameter
();
static
void
dnodeCleanupStorage
();
static
void
dnodeCleanUpSystem
();
static
void
dnodeSetRunStatus
(
SDnodeRunStatus
status
);
static
void
signal_handler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
static
void
dnodeCheckDataDirOpenned
(
char
*
dir
);
static
SDnodeRunStatus
tsDnodeRunStatus
=
TSDB_DNODE_RUN_STATUS_STOPPED
;
int32_t
main
(
int32_t
argc
,
char
*
argv
[])
{
// Set global configuration file
for
(
int32_t
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
{
printf
(
"'-c' requires a parameter, default:%s
\n
"
,
configDir
);
exit
(
EXIT_FAILURE
);
}
}
else
if
(
strcmp
(
argv
[
i
],
"-V"
)
==
0
)
{
#ifdef _SYNC
char
*
versionStr
=
"enterprise"
;
#else
char
*
versionStr
=
"community"
;
#endif
printf
(
"%s version: %s compatible_version: %s
\n
"
,
versionStr
,
version
,
compatible_version
);
printf
(
"gitinfo: %s
\n
"
,
gitinfo
);
printf
(
"gitinfoI: %s
\n
"
,
gitinfoOfInternal
);
printf
(
"buildinfo: %s
\n
"
,
buildinfo
);
exit
(
EXIT_SUCCESS
);
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
)
{
grantParseParameter
();
exit
(
EXIT_SUCCESS
);
}
#ifdef TAOS_MEM_CHECK
else
if
(
strcmp
(
argv
[
i
],
"--alloc-random-fail"
)
==
0
)
{
if
((
i
<
argc
-
1
)
&&
(
argv
[
i
+
1
][
0
]
!=
'-'
))
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_RANDOM_FAIL
,
argv
[
++
i
],
true
);
}
else
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_RANDOM_FAIL
,
NULL
,
true
);
}
}
else
if
(
strcmp
(
argv
[
i
],
"--detect-mem-leak"
)
==
0
)
{
if
((
i
<
argc
-
1
)
&&
(
argv
[
i
+
1
][
0
]
!=
'-'
))
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_DETECT_LEAK
,
argv
[
++
i
],
true
);
}
else
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_DETECT_LEAK
,
NULL
,
true
);
}
}
#endif
}
/* Set termination handler. */
struct
sigaction
act
=
{{
0
}};
act
.
sa_flags
=
SA_SIGINFO
;
act
.
sa_sigaction
=
signal_handler
;
sigaction
(
SIGTERM
,
&
act
,
NULL
);
sigaction
(
SIGHUP
,
&
act
,
NULL
);
sigaction
(
SIGINT
,
&
act
,
NULL
);
sigaction
(
SIGUSR1
,
&
act
,
NULL
);
sigaction
(
SIGUSR2
,
&
act
,
NULL
);
// Open /var/log/syslog file to record information.
openlog
(
"TDengine:"
,
LOG_PID
|
LOG_CONS
|
LOG_NDELAY
,
LOG_LOCAL1
);
syslog
(
LOG_INFO
,
"Starting TDengine service..."
);
// Initialize the system
if
(
dnodeInitSystem
()
<
0
)
{
syslog
(
LOG_ERR
,
"Error initialize TDengine system"
);
closelog
();
dnodeCleanUpSystem
();
exit
(
EXIT_FAILURE
);
}
syslog
(
LOG_INFO
,
"Started TDengine service successfully."
);
while
(
1
)
{
sleep
(
1000
);
}
}
static
void
signal_handler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
)
{
if
(
signum
==
SIGUSR1
)
{
taosCfgDynamicOptions
(
"debugFlag 135"
);
return
;
}
if
(
signum
==
SIGUSR2
)
{
taosCfgDynamicOptions
(
"resetlog"
);
return
;
}
syslog
(
LOG_INFO
,
"Shut down signal is %d"
,
signum
);
syslog
(
LOG_INFO
,
"Shutting down TDengine service..."
);
// clean the system.
dPrint
(
"shut down signal is %d, sender PID:%d"
,
signum
,
sigInfo
->
si_pid
);
dnodeCleanUpSystem
();
// close the syslog
syslog
(
LOG_INFO
,
"Shut down TDengine service successfully"
);
dPrint
(
"TDengine is shut down!"
);
closelog
();
exit
(
EXIT_SUCCESS
);
}
static
int32_t
dnodeInitSystem
()
{
int32_t
dnodeInitSystem
()
{
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_INITIALIZE
);
tscEmbedded
=
1
;
taosResolveCRC
();
...
...
@@ -180,7 +80,7 @@ static int32_t dnodeInitSystem() {
return
0
;
}
static
void
dnodeCleanUpSystem
()
{
void
dnodeCleanUpSystem
()
{
if
(
dnodeGetRunStatus
()
!=
TSDB_DNODE_RUN_STATUS_STOPPED
)
{
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_STOPPED
);
dnodeCleanupShell
();
...
...
src/dnode/src/dnodeSystem.c
0 → 100644
浏览文件 @
4d83bb3e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeMain.h"
static
void
signal_handler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
int32_t
main
(
int32_t
argc
,
char
*
argv
[])
{
// Set global configuration file
for
(
int32_t
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
{
printf
(
"'-c' requires a parameter, default:%s
\n
"
,
configDir
);
exit
(
EXIT_FAILURE
);
}
}
else
if
(
strcmp
(
argv
[
i
],
"-V"
)
==
0
)
{
#ifdef _SYNC
char
*
versionStr
=
"enterprise"
;
#else
char
*
versionStr
=
"community"
;
#endif
printf
(
"%s version: %s compatible_version: %s
\n
"
,
versionStr
,
version
,
compatible_version
);
printf
(
"gitinfo: %s
\n
"
,
gitinfo
);
printf
(
"gitinfoI: %s
\n
"
,
gitinfoOfInternal
);
printf
(
"buildinfo: %s
\n
"
,
buildinfo
);
exit
(
EXIT_SUCCESS
);
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
)
{
grantParseParameter
();
exit
(
EXIT_SUCCESS
);
}
#ifdef TAOS_MEM_CHECK
else
if
(
strcmp
(
argv
[
i
],
"--alloc-random-fail"
)
==
0
)
{
if
((
i
<
argc
-
1
)
&&
(
argv
[
i
+
1
][
0
]
!=
'-'
))
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_RANDOM_FAIL
,
argv
[
++
i
],
true
);
}
else
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_RANDOM_FAIL
,
NULL
,
true
);
}
}
else
if
(
strcmp
(
argv
[
i
],
"--detect-mem-leak"
)
==
0
)
{
if
((
i
<
argc
-
1
)
&&
(
argv
[
i
+
1
][
0
]
!=
'-'
))
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_DETECT_LEAK
,
argv
[
++
i
],
true
);
}
else
{
taosSetAllocMode
(
TAOS_ALLOC_MODE_DETECT_LEAK
,
NULL
,
true
);
}
}
#endif
}
/* Set termination handler. */
struct
sigaction
act
=
{{
0
}};
act
.
sa_flags
=
SA_SIGINFO
;
act
.
sa_sigaction
=
signal_handler
;
sigaction
(
SIGTERM
,
&
act
,
NULL
);
sigaction
(
SIGHUP
,
&
act
,
NULL
);
sigaction
(
SIGINT
,
&
act
,
NULL
);
sigaction
(
SIGUSR1
,
&
act
,
NULL
);
sigaction
(
SIGUSR2
,
&
act
,
NULL
);
// Open /var/log/syslog file to record information.
openlog
(
"TDengine:"
,
LOG_PID
|
LOG_CONS
|
LOG_NDELAY
,
LOG_LOCAL1
);
syslog
(
LOG_INFO
,
"Starting TDengine service..."
);
// Initialize the system
if
(
dnodeInitSystem
()
<
0
)
{
syslog
(
LOG_ERR
,
"Error initialize TDengine system"
);
closelog
();
dnodeCleanUpSystem
();
exit
(
EXIT_FAILURE
);
}
syslog
(
LOG_INFO
,
"Started TDengine service successfully."
);
while
(
1
)
{
sleep
(
1000
);
}
}
static
void
signal_handler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
)
{
if
(
signum
==
SIGUSR1
)
{
taosCfgDynamicOptions
(
"debugFlag 135"
);
return
;
}
if
(
signum
==
SIGUSR2
)
{
taosCfgDynamicOptions
(
"resetlog"
);
return
;
}
syslog
(
LOG_INFO
,
"Shut down signal is %d"
,
signum
);
syslog
(
LOG_INFO
,
"Shutting down TDengine service..."
);
// clean the system.
dPrint
(
"shut down signal is %d, sender PID:%d"
,
signum
,
sigInfo
->
si_pid
);
dnodeCleanUpSystem
();
// close the syslog
syslog
(
LOG_INFO
,
"Shut down TDengine service successfully"
);
dPrint
(
"TDengine is shut down!"
);
closelog
();
exit
(
EXIT_SUCCESS
);
}
src/dnode/src/dnodeVRead.c
浏览文件 @
4d83bb3e
...
...
@@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
char
*
pCont
=
(
char
*
)
pMsg
->
pCont
;
void
*
pVnode
;
dTrace
(
"dnode %s msg incoming, thandle:%p"
,
taosMsg
[
pMsg
->
msgType
],
pMsg
->
handle
);
while
(
leftLen
>
0
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pCont
;
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
...
...
@@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) {
continue
;
}
dTrace
(
"%p, msg:%s will be processed"
,
pReadMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pReadMsg
->
rpcMsg
.
msgType
]);
int32_t
code
=
vnodeProcessRead
(
pVnode
,
pReadMsg
->
rpcMsg
.
msgType
,
pReadMsg
->
pCont
,
pReadMsg
->
contLen
,
&
pReadMsg
->
rspRet
);
dnodeSendRpcReadRsp
(
pVnode
,
pReadMsg
,
code
);
taosFreeQitem
(
pReadMsg
);
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
4d83bb3e
...
...
@@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead
->
msgType
=
pWrite
->
rpcMsg
.
msgType
;
pHead
->
version
=
0
;
pHead
->
len
=
pWrite
->
contLen
;
dTrace
(
"%p, msg:%s will be processed"
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
}
else
{
pHead
=
(
SWalHead
*
)
item
;
}
...
...
src/inc/taosdef.h
浏览文件 @
4d83bb3e
...
...
@@ -193,20 +193,20 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_ACCT_LEN TSDB_UNI_LEN
#define TSDB_PASSWORD_LEN TSDB_UNI_LEN
#define TSDB_MAX_COLUMNS
256
#define TSDB_MAX_COLUMNS
1024
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 192
#define TSDB_DB_NAME_LEN 32
#define TSDB_COL_NAME_LEN 64
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS *
16
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS *
64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb
#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS *
16
#define TSDB_MAX_TAGS_LEN
512
#define TSDB_MAX_TAGS
32
#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS *
64
#define TSDB_MAX_TAGS_LEN
65536
#define TSDB_MAX_TAGS
128
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
...
...
@@ -236,7 +236,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_
SQLCMD
_SIZE 1024
#define TSDB_
CQ_SQL
_SIZE 1024
#define TSDB_MAX_VNODES 256
#define TSDB_MIN_VNODES 50
#define TSDB_INVALID_VNODE_NUM 0
...
...
src/inc/trpc.h
浏览文件 @
4d83bb3e
...
...
@@ -48,6 +48,7 @@ typedef struct {
int
contLen
;
int32_t
code
;
void
*
handle
;
void
*
ahandle
;
//app handle set by client, for debug purpose
}
SRpcMsg
;
typedef
struct
{
...
...
src/mnode/inc/mgmtDef.h
浏览文件 @
4d83bb3e
...
...
@@ -68,7 +68,7 @@ typedef struct SMnodeObj {
// todo use dynamic length string
typedef
struct
{
char
tableId
[
TSDB_TABLE_ID_LEN
+
1
]
;
char
*
tableId
;
int8_t
type
;
}
STableObj
;
...
...
@@ -77,6 +77,7 @@ typedef struct SSuperTableObj {
uint64_t
uid
;
int64_t
createdTime
;
int32_t
sversion
;
int32_t
tversion
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int8_t
reserved
[
15
];
...
...
src/mnode/inc/mgmtSdb.h
浏览文件 @
4d83bb3e
...
...
@@ -35,7 +35,8 @@ typedef enum {
typedef
enum
{
SDB_KEY_STRING
,
SDB_KEY_INT
,
SDB_KEY_AUTO
SDB_KEY_AUTO
,
SDB_KEY_VAR_STRING
,
}
ESdbKey
;
typedef
enum
{
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
4d83bb3e
...
...
@@ -104,6 +104,14 @@ bool sdbIsServing() {
return
tsSdbObj
.
status
==
SDB_STATUS_SERVING
;
}
static
void
*
sdbGetObjKey
(
SSdbTable
*
pTable
,
void
*
key
)
{
if
(
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
return
*
(
char
**
)
key
;
}
return
key
;
}
static
char
*
sdbGetActionStr
(
int32_t
action
)
{
switch
(
action
)
{
case
SDB_ACTION_INSERT
:
...
...
@@ -116,20 +124,25 @@ static char *sdbGetActionStr(int32_t action) {
return
"invalid"
;
}
static
char
*
sdbGet
keyStr
(
SSdbTable
*
pTable
,
void
*
row
)
{
static
char
*
sdbGet
KeyStr
(
SSdbTable
*
pTable
,
void
*
key
)
{
static
char
str
[
16
];
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_STRING
:
return
(
char
*
)
row
;
case
SDB_KEY_VAR_STRING
:
return
(
char
*
)
key
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
row
);
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
key
);
return
str
;
default:
return
"invalid"
;
}
}
static
char
*
sdbGetKeyStrFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetKeyStr
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
static
void
*
sdbGetTableFromId
(
int32_t
tableId
)
{
return
tsSdbObj
.
tableList
[
tableId
];
}
...
...
@@ -332,50 +345,48 @@ void sdbCleanUp() {
pthread_mutex_destroy
(
&
tsSdbObj
.
mutex
);
}
void
sdbIncRef
(
void
*
handle
,
void
*
pRow
)
{
if
(
pRow
)
{
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
atomic_add_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, add ref to record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
}
void
sdbIncRef
(
void
*
handle
,
void
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
atomic_add_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, add ref to record:%s:%d"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
}
}
void
sdbDecRef
(
void
*
handle
,
void
*
pRow
)
{
if
(
pRow
)
{
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, def ref of record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
}
int8_t
*
updateEnd
=
pRow
+
pTable
->
refCountPos
-
1
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"table:%s, record:%s:%s:%d is destroyed"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
SSdbOper
oper
=
{.
pObj
=
pRow
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
void
sdbDecRef
(
void
*
handle
,
void
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, def ref of record:%s:%d"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
}
}
static
SSdbRow
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbRow
*
pMeta
;
int8_t
*
updateEnd
=
pObj
+
pTable
->
refCountPos
-
1
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"table:%s, record:%s:%d is destroyed"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
SSdbOper
oper
=
{.
pObj
=
pObj
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
}
if
(
handle
==
NULL
)
return
NULL
;
static
SSdbRow
*
sdbGetRowMeta
(
SSdbTable
*
pTable
,
void
*
key
)
{
if
(
pTable
==
NULL
)
return
NULL
;
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
pMeta
=
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
return
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
}
return
pMeta
;
static
SSdbRow
*
sdbGetRowMetaFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetRowMeta
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
)
{
...
...
@@ -387,7 +398,7 @@ void *sdbGetRow(void *handle, void *key) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
pMeta
=
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
...
...
@@ -400,6 +411,10 @@ void *sdbGetRow(void *handle, void *key) {
return
pMeta
->
row
;
}
static
void
*
sdbGetRowFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetRow
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
static
int32_t
sdbInsertHash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
SSdbRow
rowMeta
;
rowMeta
.
rowSize
=
pOper
->
rowSize
;
...
...
@@ -407,11 +422,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
keySize
=
strlen
((
char
*
)
pOper
->
pObj
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
taosHashPut
(
pTable
->
iHandle
,
pOper
->
pObj
,
keySize
,
&
rowMeta
,
sizeof
(
SSdbRow
));
taosHashPut
(
pTable
->
iHandle
,
key
,
keySize
,
&
rowMeta
,
sizeof
(
SSdbRow
));
sdbIncRef
(
pTable
,
pOper
->
pObj
);
pTable
->
numOfRows
++
;
...
...
@@ -425,7 +443,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, insert record:%s to hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
(
*
pTable
->
insertFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -436,17 +454,20 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
keySize
=
strlen
((
char
*
)
pOper
->
pObj
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
taosHashRemove
(
pTable
->
iHandle
,
pOper
->
pObj
,
keySize
);
taosHashRemove
(
pTable
->
iHandle
,
key
,
keySize
);
pTable
->
numOfRows
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, delete record:%s from hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
int8_t
*
updateEnd
=
pOper
->
pObj
+
pTable
->
refCountPos
-
1
;
*
updateEnd
=
1
;
...
...
@@ -457,7 +478,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
static
int32_t
sdbUpdateHash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
sdbTrace
(
"table:%s, update record:%s in hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
(
*
pTable
->
updateFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -488,7 +509,7 @@ static int sdbWrite(void *param, void *data, int type) {
}
else
if
(
pHead
->
version
!=
tsSdbObj
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
sdbError
(
"table:%s, failed to restore %s record:%s from wal, version:%"
PRId64
" too large, sdb version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGet
k
eyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGet
K
eyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tsSdbObj
.
version
);
return
TSDB_CODE_OTHERS
;
}
else
{
...
...
@@ -540,8 +561,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
if
(
sdbGetRow
(
pTable
,
pOper
->
pObj
))
{
sdbError
(
"table:%s, failed to insert record:%s, already exist"
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
));
if
(
sdbGetRow
FromObj
(
pTable
,
pOper
->
pObj
))
{
sdbError
(
"table:%s, failed to insert record:%s, already exist"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbDecRef
(
pTable
,
pOper
->
pObj
);
return
TSDB_CODE_ALREADY_THERE
;
}
...
...
@@ -580,7 +601,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
SSdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
SSdbRow
*
pMeta
=
sdbGetRowMeta
FromObj
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
@@ -590,25 +611,27 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
assert
(
pMetaRow
!=
NULL
);
if
(
pOper
->
type
==
SDB_OPER_GLOBAL
)
{
int32_t
rowSize
=
0
;
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
0
;
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_STRING
:
rowSize
=
strlen
((
char
*
)
pOper
->
pObj
)
+
1
;
case
SDB_KEY_VAR_STRING
:
keySize
=
strlen
((
char
*
)
key
)
+
1
;
break
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
rowSize
=
sizeof
(
uint64
_t
);
keySize
=
sizeof
(
uint32
_t
);
break
;
default:
return
-
1
;
}
int32_t
size
=
sizeof
(
SWalHead
)
+
row
Size
;
int32_t
size
=
sizeof
(
SWalHead
)
+
key
Size
;
SWalHead
*
pHead
=
taosAllocateQitem
(
size
);
pHead
->
version
=
0
;
pHead
->
len
=
row
Size
;
pHead
->
len
=
key
Size
;
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_DELETE
;
memcpy
(
pHead
->
cont
,
pOper
->
pObj
,
row
Size
);
memcpy
(
pHead
->
cont
,
key
,
key
Size
);
int32_t
code
=
sdbWrite
(
pOper
,
pHead
,
pHead
->
msgType
);
taosFreeQitem
(
pHead
);
...
...
@@ -622,7 +645,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
SSdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
SSdbRow
*
pMeta
=
sdbGetRowMeta
FromObj
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
4d83bb3e
...
...
@@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"%p, msg:%s will be processed"
,
rpcMsg
->
ahandle
,
taosMsg
[
rpcMsg
->
msgType
]);
if
(
rpcMsg
->
pCont
==
NULL
)
{
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_INVALID_MSG_LEN
);
return
;
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
4d83bb3e
...
...
@@ -84,6 +84,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static
int32_t
mgmtFindSuperTableColumnIndex
(
SSuperTableObj
*
pStable
,
char
*
colName
);
static
void
mgmtDestroyChildTable
(
SChildTableObj
*
pTable
)
{
tfree
(
pTable
->
info
.
tableId
);
tfree
(
pTable
->
schema
);
tfree
(
pTable
->
sql
);
tfree
(
pTable
);
...
...
@@ -180,6 +181,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
SChildTableObj
*
pNew
=
pOper
->
pObj
;
SChildTableObj
*
pTable
=
mgmtGetChildTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSql
=
pTable
->
sql
;
void
*
oldSchema
=
pTable
->
schema
;
memcpy
(
pTable
,
pNew
,
pOper
->
rowSize
);
...
...
@@ -188,6 +190,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
free
(
pNew
);
free
(
oldSql
);
free
(
oldSchema
);
free
(
oldTableId
);
}
mgmtDecTableRef
(
pTable
);
...
...
@@ -195,51 +198,66 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mgmtChildTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
);
SChildTableObj
*
pTable
=
pOper
->
pObj
;
assert
(
pTable
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
memcpy
(
pOper
->
rowData
,
pTable
,
tsChildTableUpdateSize
);
pOper
->
rowSize
=
tsChildTableUpdateSize
;
}
else
{
int32_t
len
=
strlen
(
pTable
->
info
.
tableId
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
memcpy
(
pOper
->
rowData
,
pTable
->
info
.
tableId
,
len
);
memset
(
pOper
->
rowData
+
len
,
0
,
1
);
len
++
;
memcpy
(
pOper
->
rowData
+
len
,
(
char
*
)
pTable
+
sizeof
(
char
*
),
tsChildTableUpdateSize
);
len
+=
tsChildTableUpdateSize
;
if
(
pTable
->
info
.
type
!=
TSDB_CHILD_TABLE
)
{
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
if
(
maxRowSize
<
tsChildTableUpdateSize
+
schemaSize
)
{
return
TSDB_CODE_INVALID_MSG_LEN
;
memcpy
(
pOper
->
rowData
+
len
,
pTable
->
schema
,
schemaSize
);
len
+=
schemaSize
;
if
(
pTable
->
sqlLen
!=
0
)
{
memcpy
(
pOper
->
rowData
+
len
,
pTable
->
sql
,
pTable
->
sqlLen
);
len
+=
pTable
->
sqlLen
;
}
memcpy
(
pOper
->
rowData
,
pTable
,
tsChildTableUpdateSize
);
memcpy
(
pOper
->
rowData
+
tsChildTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
memcpy
(
pOper
->
rowData
+
tsChildTableUpdateSize
+
schemaSize
,
pTable
->
sql
,
pTable
->
sqlLen
);
pOper
->
rowSize
=
tsChildTableUpdateSize
+
schemaSize
+
pTable
->
sqlLen
;
}
pOper
->
rowSize
=
len
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SChildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChildTableObj
));
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
if
(
pTable
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
int32_t
len
=
strlen
(
pOper
->
rowData
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
pTable
->
info
.
tableId
=
strdup
(
pOper
->
rowData
);
len
++
;
memcpy
(
pTable
,
pOper
->
rowData
,
tsChildTableUpdateSize
);
memcpy
((
char
*
)
pTable
+
sizeof
(
char
*
),
pOper
->
rowData
+
len
,
tsChildTableUpdateSize
);
len
+=
tsChildTableUpdateSize
;
if
(
pTable
->
info
.
type
!=
TSDB_CHILD_TABLE
)
{
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
pTable
->
schema
=
(
SSchema
*
)
malloc
(
schemaSize
);
if
(
pTable
->
schema
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_
SERV_OUT_OF_MEMORY
;
return
TSDB_CODE_
INVALID_TABLE_TYPE
;
}
memcpy
(
pTable
->
schema
,
pOper
->
rowData
+
tsChildTableUpdateSize
,
schemaSize
);
memcpy
(
pTable
->
schema
,
pOper
->
rowData
+
len
,
schemaSize
);
len
+=
schemaSize
;
pTable
->
sql
=
(
char
*
)
malloc
(
pTable
->
sqlLen
);
if
(
pTable
->
sql
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
if
(
pTable
->
sqlLen
!=
0
)
{
pTable
->
sql
=
malloc
(
pTable
->
sqlLen
);
if
(
pTable
->
sql
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
memcpy
(
pTable
->
sql
,
pOper
->
rowData
+
len
,
pTable
->
sqlLen
);
}
memcpy
(
pTable
->
sql
,
pOper
->
rowData
+
tsChildTableUpdateSize
+
schemaSize
,
pTable
->
sqlLen
);
}
pOper
->
pObj
=
pTable
;
...
...
@@ -311,15 +329,15 @@ static int32_t mgmtChildTableActionRestored() {
static
int32_t
mgmtInitChildTables
()
{
SChildTableObj
tObj
;
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_CTABLE
,
.
tableName
=
"ctables"
,
.
hashSessions
=
tsMaxTables
,
.
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
),
.
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_ID_LEN
+
TSDB_CQ_SQL_SIZE
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
keyType
=
SDB_KEY_
VAR_
STRING
,
.
insertFp
=
mgmtChildTableActionInsert
,
.
deleteFp
=
mgmtChildTableActionDelete
,
.
updateFp
=
mgmtChildTableActionUpdate
,
...
...
@@ -372,6 +390,7 @@ static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
taosHashCleanup
(
pStable
->
vgHash
);
pStable
->
vgHash
=
NULL
;
}
tfree
(
pStable
->
info
.
tableId
);
tfree
(
pStable
->
schema
);
tfree
(
pStable
);
}
...
...
@@ -408,11 +427,13 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
SSuperTableObj
*
pNew
=
pOper
->
pObj
;
SSuperTableObj
*
pTable
=
mgmtGetSuperTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSchema
=
pTable
->
schema
;
memcpy
(
pTable
,
pNew
,
pOper
->
rowSize
);
pTable
->
schema
=
pNew
->
schema
;
free
(
pNew
->
vgHash
);
free
(
pNew
);
free
(
oldTableId
);
free
(
oldSchema
);
}
mgmtDecTableRef
(
pTable
);
...
...
@@ -420,40 +441,50 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mgmtSuperTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
);
SSuperTableObj
*
pStable
=
pOper
->
pObj
;
assert
(
pOper
->
pObj
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
int32_t
len
=
strlen
(
pStable
->
info
.
tableId
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
len
=
TSDB_CODE_INVALID_TABLE_ID
;
if
(
maxRowSize
<
tsSuperTableUpdateSize
+
schemaSize
)
{
return
TSDB_CODE_INVALID_MSG_LEN
;
}
memcpy
(
pOper
->
rowData
,
pStable
->
info
.
tableId
,
len
);
memset
(
pOper
->
rowData
+
len
,
0
,
1
);
len
++
;
memcpy
(
pOper
->
rowData
+
len
,
(
char
*
)
pStable
+
sizeof
(
char
*
),
tsSuperTableUpdateSize
);
len
+=
tsSuperTableUpdateSize
;
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
memcpy
(
pOper
->
rowData
+
len
,
pStable
->
schema
,
schemaSize
);
len
+=
schemaSize
;
memcpy
(
pOper
->
rowData
,
pStable
,
tsSuperTableUpdateSize
);
memcpy
(
pOper
->
rowData
+
tsSuperTableUpdateSize
,
pStable
->
schema
,
schemaSize
);
pOper
->
rowSize
=
tsSuperTableUpdateSize
+
schemaSize
;
pOper
->
rowSize
=
len
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuperTableObj
));
if
(
pStable
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
memcpy
(
pStable
,
pOper
->
rowData
,
tsSuperTableUpdateSize
);
int32_t
len
=
strlen
(
pOper
->
rowData
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
pStable
->
info
.
tableId
=
strdup
(
pOper
->
rowData
);
len
++
;
memcpy
((
char
*
)
pStable
+
sizeof
(
char
*
),
pOper
->
rowData
+
len
,
tsSuperTableUpdateSize
);
len
+=
tsSuperTableUpdateSize
;
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
pStable
->
schema
=
malloc
(
schemaSize
);
if
(
pStable
->
schema
==
NULL
)
{
mgmtDestroySuperTable
(
pStable
);
return
-
1
;
return
TSDB_CODE_NOT_SUPER_TABLE
;
}
memcpy
(
pStable
->
schema
,
pOper
->
rowData
+
tsSuperTableUpdateSize
,
schemaSize
);
memcpy
(
pStable
->
schema
,
pOper
->
rowData
+
len
,
schemaSize
);
pOper
->
pObj
=
pStable
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -465,15 +496,15 @@ static int32_t mgmtSuperTableActionRestored() {
static
int32_t
mgmtInitSuperTables
()
{
SSuperTableObj
tObj
;
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_STABLE
,
.
tableName
=
"stables"
,
.
hashSessions
=
TSDB_MAX_SUPER_TABLES
,
.
maxRowSize
=
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
,
.
maxRowSize
=
sizeof
(
SSuperTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_ID_LEN
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
keyType
=
SDB_KEY_
VAR_
STRING
,
.
insertFp
=
mgmtSuperTableActionInsert
,
.
deleteFp
=
mgmtSuperTableActionDelete
,
.
updateFp
=
mgmtSuperTableActionUpdate
,
...
...
@@ -720,18 +751,19 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
static
void
mgmtProcessCreateSuperTableMsg
(
SQueuedMsg
*
pMsg
)
{
SCMCreateTableMsg
*
pCreate
=
pMsg
->
pCont
;
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuperTableObj
));
SSuperTableObj
*
pStable
=
calloc
(
1
,
sizeof
(
SSuperTableObj
));
if
(
pStable
==
NULL
)
{
mError
(
"table:%s, failed to create, no enough memory"
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
strcpy
(
pStable
->
info
.
tableId
,
pCreate
->
tableId
);
pStable
->
info
.
tableId
=
strdup
(
pCreate
->
tableId
);
pStable
->
info
.
type
=
TSDB_SUPER_TABLE
;
pStable
->
createdTime
=
taosGetTimestampMs
();
pStable
->
uid
=
(((
uint64_t
)
pStable
->
createdTime
)
<<
16
)
+
(
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pStable
->
sversion
=
0
;
pStable
->
tversion
=
0
;
pStable
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pStable
->
numOfTags
=
htons
(
pCreate
->
numOfTags
);
...
...
@@ -851,7 +883,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
}
pStable
->
numOfTags
+=
ntags
;
pStable
->
s
version
++
;
pStable
->
t
version
++
;
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
...
...
@@ -878,7 +910,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
memmove
(
pStable
->
schema
+
pStable
->
numOfColumns
+
col
,
pStable
->
schema
+
pStable
->
numOfColumns
+
col
+
1
,
sizeof
(
SSchema
)
*
(
pStable
->
numOfTags
-
col
-
1
));
pStable
->
numOfTags
--
;
pStable
->
s
version
++
;
pStable
->
t
version
++
;
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
...
...
@@ -1358,7 +1390,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
}
static
SChildTableObj
*
mgmtDoCreateChildTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
tid
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
calloc
(
1
,
sizeof
(
SChildTableObj
));
SChildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChildTableObj
));
if
(
pTable
==
NULL
)
{
mError
(
"table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
terrno
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -1371,10 +1403,10 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
pTable
->
info
.
type
=
TSDB_NORMAL_TABLE
;
}
strcpy
(
pTable
->
info
.
tableId
,
pCreate
->
tableId
);
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
sid
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
info
.
tableId
=
strdup
(
pCreate
->
tableId
);
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
sid
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
char
*
pTagData
=
(
char
*
)
pCreate
->
schema
;
// it is a tag key
...
...
@@ -1655,7 +1687,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
pMeta
->
sid
=
htonl
(
pTable
->
sid
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
pMeta
->
tableType
=
pTable
->
info
.
type
;
strncpy
(
pMeta
->
tableId
,
pTable
->
info
.
tableId
,
tListL
en
(
pTable
->
info
.
tableId
));
strncpy
(
pMeta
->
tableId
,
pTable
->
info
.
tableId
,
strl
en
(
pTable
->
info
.
tableId
));
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
pMeta
->
sversion
=
htons
(
pTable
->
superTable
->
sversion
);
...
...
src/rpc/inc/rpcHead.h
浏览文件 @
4d83bb3e
...
...
@@ -49,6 +49,7 @@ typedef struct {
char
encrypt
:
3
;
// encrypt algorithm, 0: no encryption
uint16_t
tranId
;
// transcation ID
uint32_t
linkUid
;
// for unique connection ID assigned by client
uint64_t
ahandle
;
// ahandle assigned by client
uint32_t
sourceId
;
// source ID, an index for connection list
uint32_t
destId
;
// destination ID, an index for connection list
uint32_t
destIp
;
// destination IP address, for NAT scenario
...
...
src/rpc/src/rpcCache.c
浏览文件 @
4d83bb3e
...
...
@@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
rpcUnlockCache
(
pCache
->
lockedBy
+
hash
);
pCache
->
total
++
;
tTrace
(
"%p %s:%hu:%d:%d:%p added into cache, connections:%d"
,
data
,
fqdn
,
port
,
connType
,
hash
,
pNode
,
pCache
->
count
[
hash
]);
//
tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
return
;
}
...
...
@@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
rpcUnlockCache
(
pCache
->
lockedBy
+
hash
);
if
(
pData
)
{
tTrace
(
"%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d"
,
pData
,
fqdn
,
port
,
connType
,
hash
,
pNode
,
pCache
->
count
[
hash
]);
//
tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
}
else
{
tTrace
(
"%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d"
,
fqdn
,
port
,
connType
,
hash
,
pCache
->
count
[
hash
]);
//
tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
}
return
pData
;
...
...
@@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext
=
pNode
->
next
;
pCache
->
total
--
;
pCache
->
count
[
hash
]
--
;
tTrace
(
"%p %s:%hu:%d:%d:%p removed from cache, connections:%d"
,
pNode
->
data
,
pNode
->
fqdn
,
pNode
->
port
,
pNode
->
connType
,
hash
,
pNode
,
pCache
->
count
[
hash
]);
//
tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
//
pCache->count[hash]);
taosMemPoolFree
(
pCache
->
connHashMemPool
,
(
char
*
)
pNode
);
pNode
=
pNext
;
}
...
...
src/rpc/src/rpcMain.c
浏览文件 @
4d83bb3e
...
...
@@ -87,6 +87,7 @@ typedef struct {
}
SRpcReqContext
;
typedef
struct
SRpcConn
{
char
info
[
50
];
// debug info: label + pConn + ahandle
int
sid
;
// session ID
uint32_t
ownId
;
// own link ID
uint32_t
peerId
;
// peer link ID
...
...
@@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) {
return
NULL
;
}
tTrace
(
"%s
RPC is openned, numOfThreads:%d"
,
pRpc
->
label
,
pRpc
->
numOfThread
s
);
tTrace
(
"%s
rpc is openned, threads:%d sessions:%d"
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
pInit
->
session
s
);
return
pRpc
;
}
...
...
@@ -299,7 +300,7 @@ void rpcClose(void *param) {
tfree
(
pRpc
->
connList
);
pthread_mutex_destroy
(
&
pRpc
->
mutex
);
tTrace
(
"%s
RPC
is closed"
,
pRpc
->
label
);
tTrace
(
"%s
rpc
is closed"
,
pRpc
->
label
);
tfree
(
pRpc
);
}
...
...
@@ -361,9 +362,10 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
char
type
=
pMsg
->
msgType
;
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_CM_RETRIEVE
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_CM_STABLE_VGROUP
||
type
==
TSDB_MSG_TYPE_CM_TABLES_META
||
type
==
TSDB_MSG_TYPE_CM_SHOW
)
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_CM_RETRIEVE
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_CM_STABLE_VGROUP
||
type
==
TSDB_MSG_TYPE_CM_TABLES_META
||
type
==
TSDB_MSG_TYPE_CM_TABLE_META
||
type
==
TSDB_MSG_TYPE_CM_SHOW
)
pContext
->
connType
=
RPC_CONN_TCPC
;
rpcSendReqToServer
(
pRpc
,
pContext
);
...
...
@@ -374,8 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
void
rpcSendResponse
(
const
SRpcMsg
*
pRsp
)
{
int
msgLen
=
0
;
SRpcConn
*
pConn
=
(
SRpcConn
*
)
pRsp
->
handle
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
SRpcMsg
rpcMsg
=
*
pRsp
;
SRpcMsg
*
pMsg
=
&
rpcMsg
;
...
...
@@ -393,7 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcLockConn
(
pConn
);
if
(
pConn
->
inType
==
0
||
pConn
->
user
[
0
]
==
0
)
{
tTrace
(
"%s
%p, connection is already released, rsp wont be sent"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, connection is already released, rsp wont be sent"
,
pConn
->
info
);
rpcUnlockConn
(
pConn
);
return
;
}
...
...
@@ -409,7 +409,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pHead
->
linkUid
=
pConn
->
linkUid
;
pHead
->
port
=
htons
(
pConn
->
localPort
);
pHead
->
code
=
htonl
(
pMsg
->
code
);
pHead
->
ahandle
=
(
uint64_t
)
pConn
->
ahandle
;
// set pConn parameters
pConn
->
inType
=
0
;
...
...
@@ -491,6 +492,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
uint32_t
peerIp
=
taosGetIpFromFqdn
(
peerFqdn
);
if
(
peerIp
==
-
1
)
{
tError
(
"%s, failed to resolve FQDN:%s"
,
pRpc
->
label
,
peerFqdn
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
NULL
;
}
...
...
@@ -506,11 +508,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
if
(
taosOpenConn
[
connType
])
{
void
*
shandle
=
(
connType
&
RPC_CONN_TCP
)
?
pRpc
->
tcphandle
:
pRpc
->
udphandle
;
pConn
->
chandle
=
(
*
taosOpenConn
[
connType
])(
shandle
,
pConn
,
pConn
->
peerIp
,
pConn
->
peerPort
);
if
(
pConn
->
chandle
)
{
tTrace
(
"%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
sid
,
pRpc
->
user
,
peerFqdn
,
pConn
->
peerPort
,
pConn
->
connType
);
}
else
{
tError
(
"%s %p, failed to set up connection to %s:%hu"
,
pRpc
->
label
,
pConn
,
peerFqdn
,
pConn
->
peerPort
);
if
(
pConn
->
chandle
==
NULL
)
{
terrno
=
TSDB_CODE_NETWORK_UNAVAIL
;
rpcCloseConn
(
pConn
);
pConn
=
NULL
;
...
...
@@ -557,7 +555,7 @@ static void rpcCloseConn(void *thandle) {
taosFreeId
(
pRpc
->
idPool
,
pConn
->
sid
);
pConn
->
pContext
=
NULL
;
tTrace
(
"%s
%p, rpc connection is closed"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, rpc connection is closed"
,
pConn
->
info
);
rpcUnlockConn
(
pConn
);
}
...
...
@@ -619,7 +617,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
if
(
terrno
!=
0
)
{
tWarn
(
"%s %p, user not there or server not ready"
,
pRpc
->
label
,
pConn
);
taosFreeId
(
pRpc
->
idPool
,
sid
);
// sid shall be released
pConn
=
NULL
;
}
...
...
@@ -634,8 +631,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
taosHashPut
(
pRpc
->
hash
,
hashstr
,
size
,
(
char
*
)
&
pConn
,
POINTER_BYTES
);
tTrace
(
"%s %p, rpc connection is allocated, sid:%d id:%s port:%u"
,
pRpc
->
label
,
pConn
,
sid
,
pConn
->
user
,
pConn
->
localPort
);
}
return
pConn
;
...
...
@@ -660,7 +655,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
if
(
pConn
)
{
if
(
pConn
->
linkUid
!=
pHead
->
linkUid
)
{
tTrace
(
"%s %p, linkUid:0x%x not matched, received:0x%x"
,
pRpc
->
label
,
pConn
,
pConn
->
linkUid
,
pHead
->
linkUid
);
terrno
=
TSDB_CODE_MISMATCHED_METER_ID
;
pConn
=
NULL
;
}
...
...
@@ -677,21 +671,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
pConn
=
rpcGetConnFromCache
(
pRpc
->
pCache
,
pIpSet
->
fqdn
[
pIpSet
->
inUse
],
pIpSet
->
port
[
pIpSet
->
inUse
],
pContext
->
connType
);
if
(
pConn
==
NULL
||
pConn
->
user
[
0
]
==
0
)
{
pConn
=
rpcOpenConn
(
pRpc
,
pIpSet
->
fqdn
[
pIpSet
->
inUse
],
pIpSet
->
port
[
pIpSet
->
inUse
],
pContext
->
connType
);
}
if
(
pConn
)
{
pConn
->
ahandle
=
pContext
->
ahandle
;
sprintf
(
pConn
->
info
,
"%s %p %p"
,
pRpc
->
label
,
pConn
,
pConn
->
ahandle
);
}
else
{
t
Trace
(
"%s %p, connection is retrieved from cache"
,
pRpc
->
label
,
pConn
);
t
Error
(
"%s %p, failed to set up connection(%s)"
,
pRpc
->
label
,
pContext
->
ahandle
,
tstrerror
(
terrno
)
);
}
return
pConn
;
}
static
int
rpcProcessReqHead
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
)
{
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
if
(
pConn
->
peerId
==
0
)
{
pConn
->
peerId
=
pHead
->
sourceId
;
}
else
{
if
(
pConn
->
peerId
!=
pHead
->
sourceId
)
{
tTrace
(
"%s
%p, source Id is changed, old:0x%08x new:0x%08x"
,
pRpc
->
label
,
pConn
,
tTrace
(
"%s
, source Id is changed, old:0x%08x new:0x%08x"
,
pConn
->
info
,
pConn
->
peerId
,
pHead
->
sourceId
);
return
TSDB_CODE_INVALID_VALUE
;
}
...
...
@@ -700,17 +698,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if
(
pConn
->
inTranId
==
pHead
->
tranId
)
{
if
(
pConn
->
inType
==
pHead
->
msgType
)
{
if
(
pHead
->
code
==
0
)
{
tTrace
(
"%s
%p, %s is retransmitted"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
]);
tTrace
(
"%s
, %s is retransmitted"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
]);
rpcSendQuickRsp
(
pConn
,
TSDB_CODE_ACTION_IN_PROGRESS
);
}
else
{
// do nothing, it is heart beat from client
}
}
else
if
(
pConn
->
inType
==
0
)
{
tTrace
(
"%s %p, %s is already processed, tranId:%d"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pConn
->
inTranId
);
tTrace
(
"%s, %s is already processed, tranId:%d"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
inTranId
);
rpcSendMsgToPeer
(
pConn
,
pConn
->
pRspMsg
,
pConn
->
rspMsgLen
);
// resend the response
}
else
{
tTrace
(
"%s
%p, mismatched message %s and tranId"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
]);
tTrace
(
"%s
, mismatched message %s and tranId"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
]);
}
// do not reply any message
...
...
@@ -718,7 +715,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
}
if
(
pConn
->
inType
!=
0
)
{
tTrace
(
"%s
%p, last session is not finished, inTranId:%d tranId:%d"
,
pRpc
->
label
,
pConn
,
tTrace
(
"%s
, last session is not finished, inTranId:%d tranId:%d"
,
pConn
->
info
,
pConn
->
inTranId
,
pHead
->
tranId
);
return
TSDB_CODE_LAST_SESSION_NOT_FINISHED
;
}
...
...
@@ -750,7 +747,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
if
(
pHead
->
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
pConn
->
tretry
<=
tsRpcMaxRetry
)
{
tTrace
(
"%s
%p, peer is still processing the transaction"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, peer is still processing the transaction"
,
pConn
->
info
);
pConn
->
tretry
++
;
rpcSendReqHead
(
pConn
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
...
...
@@ -789,7 +786,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
pConn
=
rpcGetConnObj
(
pRpc
,
sid
,
pRecv
);
if
(
pConn
==
NULL
)
return
NULL
;
if
(
pConn
==
NULL
)
{
tError
(
"%s %p, failed to get connection obj(%s)"
,
pRpc
->
label
,
pHead
->
ahandle
,
tstrerror
(
terrno
));
return
NULL
;
}
else
{
if
(
rpcIsReq
(
pHead
->
msgType
))
{
pConn
->
ahandle
=
(
void
*
)
pHead
->
ahandle
;
sprintf
(
pConn
->
info
,
"%s %p %p"
,
pRpc
->
label
,
pConn
,
pConn
->
ahandle
);
}
}
rpcLockConn
(
pConn
);
sid
=
pConn
->
sid
;
...
...
@@ -826,7 +831,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
static
void
rpcProcessBrokenLink
(
SRpcConn
*
pConn
)
{
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
tTrace
(
"%s
%p, link is broken"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, link is broken"
,
pConn
->
info
);
// pConn->chandle = NULL;
if
(
pConn
->
outType
)
{
...
...
@@ -837,7 +842,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if
(
pConn
->
inType
)
{
// if there are pending request, notify the app
tTrace
(
"%s
%p, connection is gone, notify the app"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, connection is gone, notify the app"
,
pConn
->
info
);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
...
...
@@ -872,17 +877,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn
=
rpcProcessMsgHead
(
pRpc
,
pRecv
);
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
{
tTrace
(
"%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
ip
,
pRecv
->
port
,
terrno
,
tTrace
(
"%s %p
%p
, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x"
,
pRpc
->
label
,
pConn
,
(
void
*
)
pHead
->
ahandle
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
ip
,
pRecv
->
port
,
terrno
,
pRecv
->
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
,
pHead
->
code
);
}
int32_t
code
=
terrno
;
if
(
code
!=
TSDB_CODE_ALREADY_PROCESSED
)
{
if
(
code
!=
0
)
{
// parsing error
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
rpcSendErrorMsgToPeer
(
pRecv
,
code
);
tTrace
(
"%s %p
, %s is sent with error code:%x"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
+
1
],
code
);
tTrace
(
"%s %p
%p, %s is sent with error code:%x"
,
pRpc
->
label
,
pConn
,
(
void
*
)
pHead
->
ahandle
,
taosMsg
[
pHead
->
msgType
+
1
],
code
);
}
}
else
{
// parsing OK
rpcProcessIncomingMsg
(
pConn
,
pHead
);
...
...
@@ -924,6 +929,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
ahandle
=
pConn
->
ahandle
;
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
rpcMsg
.
handle
=
pConn
;
...
...
@@ -948,14 +954,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext
->
redirect
++
;
if
(
pContext
->
redirect
>
TSDB_MAX_REPLICA
)
{
pHead
->
code
=
TSDB_CODE_NETWORK_UNAVAIL
;
tWarn
(
"%s
%p, too many redirects, quit"
,
pRpc
->
label
,
pConn
);
tWarn
(
"%s
, too many redirects, quit"
,
pConn
->
info
);
}
}
if
(
pHead
->
code
==
TSDB_CODE_REDIRECT
)
{
pContext
->
numOfTry
=
0
;
memcpy
(
&
pContext
->
ipSet
,
pHead
->
content
,
sizeof
(
pContext
->
ipSet
));
tTrace
(
"%s
%p, redirect is received, numOfIps:%d"
,
pRpc
->
label
,
pConn
,
pContext
->
ipSet
.
numOfIps
);
tTrace
(
"%s
, redirect is received, numOfIps:%d"
,
pConn
->
info
,
pContext
->
ipSet
.
numOfIps
);
for
(
int
i
=
0
;
i
<
pContext
->
ipSet
.
numOfIps
;
++
i
)
pContext
->
ipSet
.
port
[
i
]
=
htons
(
pContext
->
ipSet
.
port
[
i
]);
rpcSendReqToServer
(
pRpc
,
pContext
);
...
...
@@ -1061,6 +1067,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
return
;
}
pConn
->
ahandle
=
pContext
->
ahandle
;
rpcLockConn
(
pConn
);
// set the message header
...
...
@@ -1074,6 +1081,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead
->
destId
=
pConn
->
peerId
;
pHead
->
port
=
0
;
pHead
->
linkUid
=
pConn
->
linkUid
;
pHead
->
ahandle
=
(
uint64_t
)
pConn
->
ahandle
;
if
(
!
pConn
->
secured
)
memcpy
(
pHead
->
user
,
pConn
->
user
,
tListLen
(
pHead
->
user
));
// set the connection parameters
...
...
@@ -1091,29 +1099,28 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
static
void
rpcSendMsgToPeer
(
SRpcConn
*
pConn
,
void
*
msg
,
int
msgLen
)
{
int
writtenLen
=
0
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
msgLen
=
rpcAddAuthPart
(
pConn
,
msg
,
msgLen
);
if
(
rpcIsReq
(
pHead
->
msgType
))
{
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s
%p
, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
p
Rpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
tTrace
(
"%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
p
Conn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
else
{
if
(
pHead
->
code
==
0
)
pConn
->
secured
=
1
;
// for success response, set link as secured
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s %p
, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
p
Rpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIp
,
pConn
->
peerPort
,
tTrace
(
"%s
, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
p
Conn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIp
,
pConn
->
peerPort
,
htonl
(
pHead
->
code
),
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
tTrace
(
"connection type is: %d"
,
pConn
->
connType
);
writtenLen
=
(
*
taosSendData
[
pConn
->
connType
])(
pConn
->
peerIp
,
pConn
->
peerPort
,
pHead
,
msgLen
,
pConn
->
chandle
);
if
(
writtenLen
!=
msgLen
)
{
tError
(
"%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s"
,
pRpc
->
label
,
pConn
,
msgLen
,
writtenLen
,
strerror
(
errno
));
tError
(
"%s, failed to send, msgLen:%d written:%d, reason:%s"
,
pConn
->
info
,
msgLen
,
writtenLen
,
strerror
(
errno
));
}
tDump
(
msg
,
msgLen
);
...
...
@@ -1128,7 +1135,7 @@ static void rpcProcessConnError(void *param, void *id) {
return
;
}
tTrace
(
"%s
connection error happens"
,
pRpc
->
label
);
tTrace
(
"%s
%p, connection error happens"
,
pRpc
->
label
,
pContext
->
ahandle
);
if
(
pContext
->
numOfTry
>=
pContext
->
ipSet
.
numOfIps
)
{
rpcMsg
.
msgType
=
pContext
->
msgType
+
1
;
...
...
@@ -1154,23 +1161,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
rpcLockConn
(
pConn
);
if
(
pConn
->
outType
&&
pConn
->
user
[
0
])
{
tTrace
(
"%s
%p, expected %s is not received"
,
pRpc
->
label
,
pConn
,
taosMsg
[(
int
)
pConn
->
outType
+
1
]);
tTrace
(
"%s
, expected %s is not received"
,
pConn
->
info
,
taosMsg
[(
int
)
pConn
->
outType
+
1
]);
pConn
->
pTimer
=
NULL
;
pConn
->
retry
++
;
if
(
pConn
->
retry
<
4
)
{
tTrace
(
"%s %p, re-send msg:%s to %s:%hu"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
tTrace
(
"%s, re-send msg:%s to %s:%hu"
,
pConn
->
info
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
rpcSendMsgToPeer
(
pConn
,
pConn
->
pReqMsg
,
pConn
->
reqMsgLen
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
}
else
{
// close the connection
tTrace
(
"%s %p, failed to send msg:%s to %s:%hu"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
tTrace
(
"%s, failed to send msg:%s to %s:%hu"
,
pConn
->
info
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
reportDisc
=
1
;
}
}
else
{
tTrace
(
"%s
%p, retry timer not processed"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, retry timer not processed"
,
pConn
->
info
);
}
rpcUnlockConn
(
pConn
);
...
...
@@ -1187,10 +1192,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
if
(
pConn
->
user
[
0
])
{
tTrace
(
"%s
%p, close the connection since no activity"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, close the connection since no activity"
,
pConn
->
info
);
if
(
pConn
->
inType
&&
pRpc
->
cfp
)
{
// if there are pending request, notify the app
tTrace
(
"%s
%p, notify the app, connection is gone"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, notify the app, connection is gone"
,
pConn
->
info
);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
...
...
@@ -1203,7 +1208,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
}
rpcCloseConn
(
pConn
);
}
else
{
tTrace
(
"%s
%p, idle timer:%p not processed"
,
pRpc
->
label
,
pConn
,
tmrId
);
tTrace
(
"%s
, idle timer:%p not processed"
,
pConn
->
info
,
tmrId
);
}
}
...
...
@@ -1214,11 +1219,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcLockConn
(
pConn
);
if
(
pConn
->
inType
&&
pConn
->
user
[
0
])
{
tTrace
(
"%s
%p, progress timer expired, send progress"
,
pRpc
->
label
,
pConn
);
tTrace
(
"%s
, progress timer expired, send progress"
,
pConn
->
info
);
rpcSendQuickRsp
(
pConn
,
TSDB_CODE_ACTION_IN_PROGRESS
);
taosTmrReset
(
rpcProcessProgressTimer
,
tsRpcTimer
/
2
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
}
else
{
tTrace
(
"%s
%p, progress timer:%p not processed"
,
pRpc
->
label
,
pConn
,
tmrId
);
tTrace
(
"%s
, progress timer:%p not processed"
,
pConn
->
info
,
tmrId
);
}
rpcUnlockConn
(
pConn
);
...
...
@@ -1252,7 +1257,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
memcpy
(
pCont
+
overhead
,
buf
,
compLen
);
pHead
->
comp
=
1
;
tTrace
(
"compress rpc msg, before:%d, after:%d"
,
contLen
,
compLen
);
//
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen
=
compLen
+
overhead
;
}
else
{
finalLen
=
contLen
;
...
...
@@ -1286,7 +1291,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead
->
msgLen
=
rpcMsgLenFromCont
(
origLen
);
rpcFreeMsg
(
pHead
);
// free the compressed message buffer
pHead
=
pNewHead
;
tTrace
(
"decompress rpc msg, compLen:%d, after:%d"
,
compLen
,
contLen
);
//
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
}
else
{
tError
(
"failed to allocate memory to decompress msg, contLen:%d"
,
contLen
);
}
...
...
@@ -1343,7 +1348,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static
int
rpcCheckAuthentication
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
int
code
=
0
;
if
((
pConn
->
secured
&&
pHead
->
spi
==
0
)
||
(
pHead
->
spi
==
0
&&
pConn
->
spi
==
0
)){
...
...
@@ -1371,20 +1375,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta
=
(
int32_t
)
htonl
(
pDigest
->
timeStamp
);
delta
-=
(
int32_t
)
taosGetTimestampSec
();
if
(
abs
(
delta
)
>
900
)
{
tWarn
(
"%s
%p, time diff:%d is too big, msg discarded"
,
pRpc
->
label
,
pConn
,
delta
);
tWarn
(
"%s
, time diff:%d is too big, msg discarded"
,
pConn
->
info
,
delta
);
code
=
TSDB_CODE_INVALID_TIME_STAMP
;
}
else
{
if
(
rpcAuthenticateMsg
(
pHead
,
msgLen
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
pConn
->
secret
)
<
0
)
{
tError
(
"%s
%p, authentication failed, msg discarded"
,
pRpc
->
label
,
pConn
);
tError
(
"%s
, authentication failed, msg discarded"
,
pConn
->
info
);
code
=
TSDB_CODE_AUTH_FAILURE
;
}
else
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
)
-
sizeof
(
SRpcDigest
);
if
(
!
rpcIsReq
(
pHead
->
msgType
)
)
pConn
->
secured
=
1
;
// link is secured for client
tTrace
(
"%s %p, message is authenticated"
,
pRpc
->
label
,
pConn
);
//tTrace("%s, message is authenticated", pConn->info
);
}
}
}
else
{
t
Trace
(
"%s %p, auth spi:%d not matched with received:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
spi
,
pHead
->
spi
);
t
Error
(
"%s, auth spi:%d not matched with received:%d"
,
pConn
->
info
,
pConn
->
spi
,
pHead
->
spi
);
code
=
TSDB_CODE_AUTH_FAILURE
;
}
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
4d83bb3e
...
...
@@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) {
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
port
=
ntohs
(
sourceAdd
.
sin_port
);
tTrace
(
"%s msg is recv from 0x%x:%hu len:%d"
,
pConn
->
label
,
sourceAdd
.
sin_addr
.
s_addr
,
port
,
dataLen
);
//
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
if
(
dataLen
<
sizeof
(
SRpcHead
))
{
tError
(
"%s recvfrom failed, reason:%s
\n
"
,
pConn
->
label
,
strerror
(
errno
));
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
4d83bb3e
...
...
@@ -80,7 +80,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
T_READ_MEMBER
(
ptr
,
int8_t
,
pTable
->
type
);
int
len
=
*
(
int
*
)
ptr
;
ptr
=
(
char
*
)
ptr
+
sizeof
(
int
);
pTable
->
name
=
calloc
(
1
,
len
+
VARSTR_HEADER_SIZE
);
pTable
->
name
=
calloc
(
1
,
len
+
VARSTR_HEADER_SIZE
+
1
);
if
(
pTable
->
name
==
NULL
)
return
NULL
;
varDataSetLen
(
pTable
->
name
,
len
);
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
4d83bb3e
...
...
@@ -429,7 +429,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
vTrace
(
"vgId:%d, data file is synced
"
,
pVnode
->
vgId
);
vTrace
(
"vgId:%d, data file is synced
, fversion:%"
PRId64
""
,
pVnode
->
vgId
,
fversion
);
pVnode
->
fversion
=
fversion
;
pVnode
->
version
=
fversion
;
...
...
tests/pytest/fulltest.sh
浏览文件 @
4d83bb3e
...
...
@@ -12,6 +12,7 @@ python3 ./test.py $1 -f insert/binary.py
python3 ./test.py
$1
-f
insert/nchar.py
python3 ./test.py
$1
-f
insert/nchar-boundary.py
python3 ./test.py
$1
-f
insert/nchar-unicode.py
python3 ./test.py
$1
-f
insert/multi.py
python3 ./test.py
$1
-f
table/column_name.py
python3 ./test.py
$1
-f
table/column_num.py
...
...
tests/pytest/insert/bool.py
浏览文件 @
4d83bb3e
...
...
@@ -58,6 +58,14 @@ class TDTestCase:
tdSql
.
query
(
'select * from tb order by ts desc'
)
tdLog
.
info
(
'tdSql.checkRow(6)'
)
tdSql
.
checkRows
(
6
)
tdLog
.
info
(
'=============== step7'
)
tdLog
.
info
(
"insert into tb values (now+6m, true)"
)
tdSql
.
execute
(
"insert into tb values (now+5m, true)"
)
tdLog
.
info
(
'select * from tb order by ts desc'
)
tdSql
.
query
(
'select * from tb order by ts desc'
)
tdLog
.
info
(
'tdSql.checkRow(7)'
)
tdSql
.
checkRows
(
7
)
# convert end
# convert end
def
stop
(
self
):
...
...
tests/pytest/smoketest.sh
浏览文件 @
4d83bb3e
...
...
@@ -21,6 +21,8 @@ python3 ./test.py $1 -f insert/date.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/nchar.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/multi.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
table/column_name.py
python3 ./test.py
$1
-s
&&
sleep
1
...
...
tests/pytest/table/boundary.py
0 → 100644
浏览文件 @
4d83bb3e
# -*- coding: utf-8 -*-
import
random
import
string
import
subprocess
import
sys
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
class
TDTestCase
:
def
init
(
self
,
conn
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
def
getLimitFromSourceCode
(
self
,
name
):
cmd
=
"grep -w '#define %s' ../../src/inc/taosdef.h|awk '{print $3}'"
%
name
return
int
(
subprocess
.
check_output
(
cmd
,
shell
=
True
))
def
generateString
(
self
,
length
):
chars
=
string
.
ascii_uppercase
+
string
.
ascii_lowercase
v
=
""
for
i
in
range
(
length
):
v
+=
random
.
choice
(
chars
)
return
v
def
checkTagBoundaries
(
self
):
tdLog
.
debug
(
"checking tag boundaries"
)
tdSql
.
prepare
()
maxTags
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_TAGS'
)
totalTagsLen
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_TAGS_LEN'
)
tdLog
.
notice
(
"max tags is %d"
%
maxTags
)
tdLog
.
notice
(
"max total tag length is %d"
%
totalTagsLen
)
# for binary tags, 2 bytes are used for length
tagLen
=
(
totalTagsLen
-
maxTags
*
2
)
//
maxTags
firstTagLen
=
totalTagsLen
-
2
*
maxTags
-
tagLen
*
(
maxTags
-
1
)
sql
=
"create table cars(ts timestamp, f int) tags(t0 binary(%d)"
%
firstTagLen
for
i
in
range
(
1
,
maxTags
):
sql
+=
", t%d binary(%d)"
%
(
i
,
tagLen
)
sql
+=
");"
tdLog
.
debug
(
"creating super table: "
+
sql
)
tdSql
.
execute
(
sql
)
tdSql
.
query
(
'show stables'
)
tdSql
.
checkRows
(
1
)
for
i
in
range
(
10
):
sql
=
"create table car%d using cars tags('%d'"
%
(
i
,
i
)
sql
+=
", '0'"
*
(
maxTags
-
1
)
+
");"
tdLog
.
debug
(
"creating table: "
+
sql
)
tdSql
.
execute
(
sql
)
sql
=
"insert into car%d values(now, 0);"
%
i
tdLog
.
debug
(
"inserting data: "
+
sql
)
tdSql
.
execute
(
sql
)
tdSql
.
query
(
'show tables'
)
tdLog
.
info
(
'tdSql.checkRow(10)'
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
'select * from cars;'
)
tdSql
.
checkRows
(
10
)
def
checkColumnBoundaries
(
self
):
tdLog
.
debug
(
"checking column boundaries"
)
tdSql
.
prepare
()
# one column is for timestamp
maxCols
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_COLUMNS'
)
-
1
sql
=
"create table cars (ts timestamp"
for
i
in
range
(
maxCols
):
sql
+=
", c%d int"
%
i
sql
+=
");"
tdSql
.
execute
(
sql
)
tdSql
.
query
(
'show tables'
)
tdSql
.
checkRows
(
1
)
sql
=
"insert into cars values (now"
for
i
in
range
(
maxCols
):
sql
+=
", %d"
%
i
sql
+=
");"
tdSql
.
execute
(
sql
)
tdSql
.
query
(
'select * from cars'
)
tdSql
.
checkRows
(
1
)
def
checkTableNameBoundaries
(
self
):
tdLog
.
debug
(
"checking table name boundaries"
)
tdSql
.
prepare
()
maxTableNameLen
=
self
.
getLimitFromSourceCode
(
'TSDB_TABLE_NAME_LEN'
)
tdLog
.
notice
(
"table name max length is %d"
%
maxTableNameLen
)
name
=
self
.
generateString
(
maxTableNameLen
-
1
)
tdLog
.
info
(
"table name is '%s'"
%
name
)
tdSql
.
execute
(
"create table %s (ts timestamp, value int)"
%
name
)
tdSql
.
execute
(
"insert into %s values(now, 0)"
%
name
)
tdSql
.
query
(
'show tables'
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
'select * from %s'
%
name
)
tdSql
.
checkRows
(
1
)
def
checkRowBoundaries
(
self
):
tdLog
.
debug
(
"checking row boundaries"
)
tdSql
.
prepare
()
# 8 bytes for timestamp
maxRowSize
=
65536
-
8
maxCols
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_COLUMNS'
)
-
1
# for binary cols, 2 bytes are used for length
colLen
=
(
maxRowSize
-
maxCols
*
2
)
//
maxCols
firstColLen
=
maxRowSize
-
2
*
maxCols
-
colLen
*
(
maxCols
-
1
)
sql
=
"create table cars (ts timestamp, c0 binary(%d)"
%
firstColLen
for
i
in
range
(
1
,
maxCols
):
sql
+=
", c%d binary(%d)"
%
(
i
,
colLen
)
sql
+=
");"
tdSql
.
execute
(
sql
)
tdSql
.
query
(
'show tables'
)
tdSql
.
checkRows
(
1
)
col
=
self
.
generateString
(
firstColLen
)
sql
=
"insert into cars values (now, '%s'"
%
col
col
=
self
.
generateString
(
colLen
)
for
i
in
range
(
1
,
maxCols
):
sql
+=
", '%s'"
%
col
sql
+=
");"
tdLog
.
info
(
sql
);
tdSql
.
execute
(
sql
)
tdSql
.
query
(
"select * from cars"
)
tdSql
.
checkRows
(
1
)
def
run
(
self
):
self
.
checkTagBoundaries
()
self
.
checkColumnBoundaries
()
self
.
checkTableNameBoundaries
()
self
.
checkRowBoundaries
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/table/column_name.py
浏览文件 @
4d83bb3e
# -*- coding: utf-8 -*-
import
sys
import
string
import
random
import
subprocess
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
...
...
@@ -14,34 +17,9 @@ class TDTestCase:
def
run
(
self
):
tdSql
.
prepare
()
# TSIM: system sh/stop_dnodes.sh
# TSIM:
# TSIM: system sh/ip.sh -i 1 -s up
# TSIM: system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
# TSIM: system sh/cfg.sh -n dnode1 -c walLevel -v 0
# TSIM: system sh/exec.sh -n dnode1 -s start
# TSIM:
# TSIM: sleep 3000
# TSIM: sql connect
# TSIM:
# TSIM: $i = 0
# TSIM: $dbPrefix = lm_cm_db
# TSIM: $tbPrefix = lm_cm_tb
# TSIM: $db = $dbPrefix . $i
# TSIM: $tb = $tbPrefix . $i
# TSIM:
# TSIM: print =============== step1
tdLog
.
info
(
'=============== step1'
)
# TSIM: sql create database $db
# TSIM: sql use $db
# TSIM:
# TSIM: sql drop table dd -x step0
tdLog
.
info
(
'drop table dd -x step0'
)
tdSql
.
error
(
'drop table dd'
)
# TSIM: return -1
# TSIM: step0:
# TSIM:
# TSIM: sql create table $tb(ts timestamp, int) -x step1
tdLog
.
info
(
'create table tb(ts timestamp, int) -x step1'
)
tdSql
.
error
(
'create table tb(ts timestamp, int)'
)
# TSIM: return -1
...
...
@@ -112,37 +90,24 @@ class TDTestCase:
tdLog
.
info
(
'=============== step4'
)
# TSIM: sql create table $tb (ts timestamp,
# a0123456789012345678901234567890123456789 int)
getMaxColNum
=
"grep -w '#define TSDB_COL_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'"
boundary
=
int
(
subprocess
.
check_output
(
getMaxColNum
,
shell
=
True
))
tdLog
.
info
(
"get max column name length is %d"
%
boundary
)
chars
=
string
.
ascii_uppercase
+
string
.
ascii_lowercase
# col_name = ''.join(random.choices(chars, k=boundary+1))
# tdLog.info(
# 'create table tb (ts timestamp, %s int), col_name length is %d' % (col_name, len(col_name)))
# tdSql.error(
# 'create table tb (ts timestamp, %s int)' % col_name)
col_name
=
''
.
join
(
random
.
choices
(
chars
,
k
=
boundary
))
tdLog
.
info
(
'create table tb (ts timestamp, a0123456789012345678901234567890123456789 int)'
)
'create table tb (ts timestamp, %s int), col_name length is %d'
%
(
col_name
,
len
(
col_name
)))
tdSql
.
execute
(
'create table tb (ts timestamp, a0123456789012345678901234567890123456789 int)'
)
# TSIM: sql drop table $tb
tdLog
.
info
(
'drop table tb'
)
tdSql
.
execute
(
'drop table tb'
)
# TSIM:
# TSIM: sql show tables
tdLog
.
info
(
'show tables'
)
tdSql
.
query
(
'show tables'
)
# TSIM: if $rows != 0 then
tdLog
.
info
(
'tdSql.checkRow(0)'
)
tdSql
.
checkRows
(
0
)
# TSIM: return -1
# TSIM: endi
# TSIM:
# TSIM: print =============== step5
tdLog
.
info
(
'=============== step5'
)
# TSIM: sql create table $tb (ts timestamp, a0123456789 int)
tdLog
.
info
(
'create table tb (ts timestamp, a0123456789 int)'
)
tdSql
.
execute
(
'create table tb (ts timestamp, a0123456789 int)'
)
# TSIM: sql show tables
tdLog
.
info
(
'show tables'
)
tdSql
.
query
(
'show tables'
)
# TSIM: if $rows != 1 then
tdLog
.
info
(
'tdSql.checkRow(1)'
)
tdSql
.
checkRows
(
1
)
# TSIM: return -1
# TSIM: endi
# TSIM:
'create table tb (ts timestamp, %s int)'
%
col_name
)
# TSIM: sql insert into $tb values (now , 1)
tdLog
.
info
(
"insert into tb values (now , 1)"
)
tdSql
.
execute
(
"insert into tb values (now , 1)"
)
...
...
@@ -152,24 +117,6 @@ class TDTestCase:
# TSIM: if $rows != 1 then
tdLog
.
info
(
'tdSql.checkRow(1)'
)
tdSql
.
checkRows
(
1
)
# TSIM: return -1
# TSIM: endi
# TSIM:
# TSIM: sql drop database $db
tdLog
.
info
(
'drop database db'
)
tdSql
.
execute
(
'drop database db'
)
# TSIM: sql show databases
tdLog
.
info
(
'show databases'
)
tdSql
.
query
(
'show databases'
)
# TSIM: if $rows != 0 then
tdLog
.
info
(
'tdSql.checkRow(0)'
)
tdSql
.
checkRows
(
0
)
# TSIM: return -1
# TSIM: endi
# TSIM:
# TSIM:
# TSIM:
# TSIM:
# convert end
def
stop
(
self
):
...
...
tests/pytest/table/column_num.py
浏览文件 @
4d83bb3e
...
...
@@ -76,7 +76,7 @@ class TDTestCase:
tdSql
.
checkRows
(
2
)
data
=
"now"
for
x
in
range
(
0
,
boundary
-
1
):
for
x
in
range
(
0
,
boundary
-
1
):
data
=
data
+
", %d"
%
x
tdLog
.
info
(
"insert into tb1 values (%s)"
%
data
)
tdSql
.
execute
(
"insert into tb1 values (%s)"
%
data
)
...
...
tests/pytest/table/tablename-boundary.py
浏览文件 @
4d83bb3e
...
...
@@ -21,7 +21,7 @@ class TDTestCase:
tableNameMaxLen
=
int
(
subprocess
.
check_output
(
getTableNameLen
,
shell
=
True
))
tdLog
.
notice
(
"table name max length is %d"
%
tableNameMaxLen
)
tdLog
.
info
(
"table name max length is %d"
%
tableNameMaxLen
)
chars
=
string
.
ascii_uppercase
+
string
.
ascii_lowercase
tb_name
=
''
.
join
(
random
.
choices
(
chars
,
k
=
tableNameMaxLen
))
tdLog
.
info
(
'tb_name length %d'
%
len
(
tb_name
))
...
...
tests/pytest/util/cases.py
浏览文件 @
4d83bb3e
...
...
@@ -71,7 +71,7 @@ class TDCases:
case
.
run
()
except
Exception
as
e
:
tdLog
.
notice
(
repr
(
e
))
tdLog
.
exit
(
"%s failed
: %s"
%
(
__file__
,
fileName
))
tdLog
.
exit
(
"%s failed
"
%
(
fileName
))
case
.
stop
()
runNum
+=
1
continue
...
...
tests/pytest/util/dnodes.py
浏览文件 @
4d83bb3e
...
...
@@ -223,8 +223,8 @@ class TDDnode:
self
.
running
=
1
tdLog
.
debug
(
"dnode:%d is running with %s "
%
(
self
.
index
,
cmd
))
tdLog
.
debug
(
"wait
4
seconds for the dnode:%d to start."
%
(
self
.
index
))
time
.
sleep
(
4
)
tdLog
.
debug
(
"wait
5
seconds for the dnode:%d to start."
%
(
self
.
index
))
time
.
sleep
(
5
)
def
stop
(
self
):
if
self
.
valgrind
==
0
:
...
...
@@ -233,16 +233,14 @@ class TDDnode:
toBeKilled
=
"valgrind.bin"
if
self
.
running
!=
0
:
killCmd
=
"ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -INT"
%
(
toBeKilled
,
self
.
cfgDir
)
psCmd
=
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}'"
%
toBeKilled
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -INT %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
self
.
running
=
0
tdLog
.
debug
(
"dnode:%d is stopped by kill -INT"
%
(
self
.
index
))
...
...
@@ -254,15 +252,14 @@ class TDDnode:
toBeKilled
=
"valgrind.bin"
if
self
.
running
!=
0
:
killCmd
=
"ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -KILL"
%
(
toBeKilled
,
self
.
cfgDir
)
psCmd
=
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}'"
%
toBeKilled
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -KILL %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
self
.
running
=
0
tdLog
.
debug
(
"dnode:%d is stopped by kill -KILL"
%
(
self
.
index
))
...
...
@@ -307,21 +304,21 @@ class TDDnodes:
self
.
dnodes
.
append
(
TDDnode
(
10
))
def
init
(
self
,
path
):
killCmd
=
"ps -ef|grep -w taosd | grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd
=
"ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -KILL %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
killCmd
=
"ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd
=
"ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -KILL %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
binPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
binPath
=
binPath
+
"/../../../debug/"
...
...
@@ -407,27 +404,27 @@ class TDDnodes:
self
.
dnodes
[
i
].
stop
()
psCmd
=
"ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
if
processID
:
cmd
=
"sudo systemctl stop taosd"
os
.
system
(
cmd
)
# if os.system(cmd) != 0 :
# tdLog.exit(cmd)
killCmd
=
"ps -ef|grep -w taosd| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd
=
"ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -KILL %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
killCmd
=
"ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd
=
"ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -KILL %s"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
.
decode
(
"utf-8"
)
# if os.system(cmd) != 0 :
# tdLog.exit(cmd)
...
...
tests/pytest/util/sql.py
浏览文件 @
4d83bb3e
...
...
@@ -15,6 +15,7 @@ import sys
import
os
import
time
import
datetime
import
inspect
from
util.log
import
*
...
...
@@ -44,7 +45,12 @@ class TDSql:
except
BaseException
:
expectErrNotOccured
=
False
if
expectErrNotOccured
:
tdLog
.
exit
(
"failed: sql:%.40s, expect error not occured"
%
(
sql
))
frame
=
inspect
.
stack
()[
1
]
callerModule
=
inspect
.
getmodule
(
frame
[
0
])
callerFilename
=
callerModule
.
__file__
tdLog
.
exit
(
"%s failed: sql:%.40s, expect error not occured"
%
(
callerFilename
,
sql
))
else
:
tdLog
.
info
(
"sql:%.40s, expect error occured"
%
(
sql
))
...
...
@@ -62,33 +68,39 @@ class TDSql:
def
checkRows
(
self
,
expectRows
):
if
self
.
queryRows
!=
expectRows
:
frame
=
inspect
.
stack
()[
1
]
callerModule
=
inspect
.
getmodule
(
frame
[
0
])
callerFilename
=
callerModule
.
__file__
tdLog
.
exit
(
"failed: sql:%.40s, queryRows:%d != expect:%d"
%
(
self
.
sql
,
self
.
queryRows
,
expectRows
))
"
%s
failed: sql:%.40s, queryRows:%d != expect:%d"
%
(
callerFilename
,
self
.
sql
,
self
.
queryRows
,
expectRows
))
tdLog
.
info
(
"sql:%.40s, queryRows:%d == expect:%d"
%
(
self
.
sql
,
self
.
queryRows
,
expectRows
))
def
checkData
(
self
,
row
,
col
,
data
):
frame
=
inspect
.
stack
()[
1
]
callerModule
=
inspect
.
getmodule
(
frame
[
0
])
callerFilename
=
callerModule
.
__file__
if
row
<
0
:
tdLog
.
exit
(
"failed: sql:%.40s, row:%d is smaller than zero"
%
(
self
.
sql
,
row
))
"
%s
failed: sql:%.40s, row:%d is smaller than zero"
%
(
callerFilename
,
self
.
sql
,
row
))
if
col
<
0
:
tdLog
.
exit
(
"failed: sql:%.40s, col:%d is smaller than zero"
%
(
self
.
sql
,
col
))
"
%s
failed: sql:%.40s, col:%d is smaller than zero"
%
(
callerFilename
,
self
.
sql
,
col
))
if
row
>=
self
.
queryRows
:
tdLog
.
exit
(
"failed: sql:%.40s, row:%d is larger than queryRows:%d"
%
(
self
.
sql
,
row
,
self
.
queryRows
))
"
%s
failed: sql:%.40s, row:%d is larger than queryRows:%d"
%
(
callerFilename
,
self
.
sql
,
row
,
self
.
queryRows
))
if
col
>=
self
.
queryCols
:
tdLog
.
exit
(
"failed: sql:%.40s, col:%d is larger than queryRows:%d"
%
(
self
.
sql
,
col
,
self
.
queryCols
))
"
%s
failed: sql:%.40s, col:%d is larger than queryRows:%d"
%
(
callerFilename
,
self
.
sql
,
col
,
self
.
queryCols
))
if
self
.
queryResult
[
row
][
col
]
!=
data
:
tdLog
.
exit
(
"failed: sql:%.40s row:%d col:%d data:%s != expect:%s"
%
(
self
.
sql
,
row
,
col
,
self
.
queryResult
[
row
][
col
],
data
))
tdLog
.
exit
(
"%s failed: sql:%.40s row:%d col:%d data:%s != expect:%s"
%
(
callerFilename
,
self
.
sql
,
row
,
col
,
self
.
queryResult
[
row
][
col
],
data
))
if
data
is
None
:
tdLog
.
info
(
"sql:%.40s, row:%d col:%d data:%s == expect:%s"
%
...
...
@@ -104,22 +116,26 @@ class TDSql:
(
self
.
sql
,
row
,
col
,
self
.
queryResult
[
row
][
col
],
data
))
def
getData
(
self
,
row
,
col
):
frame
=
inspect
.
stack
()[
1
]
callerModule
=
inspect
.
getmodule
(
frame
[
0
])
callerFilename
=
callerModule
.
__file__
if
row
<
0
:
tdLog
.
exit
(
"failed: sql:%.40s, row:%d is smaller than zero"
%
(
self
.
sql
,
row
))
"
%s
failed: sql:%.40s, row:%d is smaller than zero"
%
(
callerFilename
,
self
.
sql
,
row
))
if
col
<
0
:
tdLog
.
exit
(
"failed: sql:%.40s, col:%d is smaller than zero"
%
(
self
.
sql
,
col
))
"
%s
failed: sql:%.40s, col:%d is smaller than zero"
%
(
callerFilename
,
self
.
sql
,
col
))
if
row
>=
self
.
queryRows
:
tdLog
.
exit
(
"failed: sql:%.40s, row:%d is larger than queryRows:%d"
%
(
self
.
sql
,
row
,
self
.
queryRows
))
"
%s
failed: sql:%.40s, row:%d is larger than queryRows:%d"
%
(
callerFilename
,
self
.
sql
,
row
,
self
.
queryRows
))
if
col
>=
self
.
queryCols
:
tdLog
.
exit
(
"failed: sql:%.40s, col:%d is larger than queryRows:%d"
%
(
self
.
sql
,
col
,
self
.
queryCols
))
"
%s
failed: sql:%.40s, col:%d is larger than queryRows:%d"
%
(
callerFilename
,
self
.
sql
,
col
,
self
.
queryCols
))
return
self
.
queryResult
[
row
][
col
]
def
executeTimes
(
self
,
sql
,
times
):
...
...
@@ -137,8 +153,12 @@ class TDSql:
def
checkAffectedRows
(
self
,
expectAffectedRows
):
if
self
.
affectedRows
!=
expectAffectedRows
:
tdLog
.
exit
(
"failed: sql:%.40s, affectedRows:%d != expect:%d"
%
(
self
.
sql
,
self
.
affectedRows
,
expectAffectedRows
))
frame
=
inspect
.
stack
()[
1
]
callerModule
=
inspect
.
getmodule
(
frame
[
0
])
callerFilename
=
callerModule
.
__file__
tdLog
.
exit
(
"%s failed: sql:%.40s, affectedRows:%d != expect:%d"
%
(
callerFilename
,
self
.
sql
,
self
.
affectedRows
,
expectAffectedRows
))
tdLog
.
info
(
"sql:%.40s, affectedRows:%d == expect:%d"
%
(
self
.
sql
,
self
.
affectedRows
,
expectAffectedRows
))
...
...
tests/pytest/valgrind-test.sh
浏览文件 @
4d83bb3e
#!/bin/bash
python3 ./test.py
$1
-f
insert/basic.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/int.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/float.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/bigint.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/bool.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/double.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/smallint.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/tinyint.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/binary.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/date.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
insert/nchar.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/basic.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/int.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/float.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/bigint.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/bool.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/double.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/smallint.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/tinyint.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/binary.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/date.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/nchar.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
insert/multi.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
$1
-f
table/column_name.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
table/column_num.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
$1
-f
table/db_table.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
-g
-f
table/column_name.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
table/column_num.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
-g
-f
table/db_table.py
python3 ./test.py
-g
-s
&&
sleep
1
python3 ./test.py
$1
-f
import_merge/importDataLastSub.py
python3 ./test.py
$1
-s
&&
sleep
1
python3 ./test.py
-g
-f
import_merge/importDataLastSub.py
python3 ./test.py
-g
-s
&&
sleep
1
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录