Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
364c1289
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
364c1289
编写于
6月 22, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/query
上级
6246a5fd
2c3afd83
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
386 addition
and
283 deletion
+386
-283
.travis.yml
.travis.yml
+0
-86
README.md
README.md
+1
-0
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+25
-21
src/inc/ttokendef.h
src/inc/ttokendef.h
+10
-0
src/mnode/src/mnodeAcct.c
src/mnode/src/mnodeAcct.c
+1
-1
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+4
-1
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+3
-2
src/os/linux/src/linuxPlatform.c
src/os/linux/src/linuxPlatform.c
+0
-77
src/query/src/qtokenizer.c
src/query/src/qtokenizer.c
+1
-1
src/rpc/inc/rpcTcp.h
src/rpc/inc/rpcTcp.h
+2
-0
src/rpc/inc/rpcUdp.h
src/rpc/inc/rpcUdp.h
+1
-0
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+23
-11
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+46
-22
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+20
-9
src/tsdb/src/tsdbBuffer.c
src/tsdb/src/tsdbBuffer.c
+1
-0
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+3
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+6
-5
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+5
-7
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+2
-1
src/util/inc/tstoken.h
src/util/inc/tstoken.h
+1
-8
src/util/src/tsocket.c
src/util/src/tsocket.c
+1
-28
tests/pytest/table/boundary.py
tests/pytest/table/boundary.py
+1
-1
tests/script/general/stable/refcount.sim
tests/script/general/stable/refcount.sim
+132
-0
tests/script/jenkins/unique.txt
tests/script/jenkins/unique.txt
+57
-0
tests/script/loop.sh
tests/script/loop.sh
+39
-0
tests/script/unique/cluster/balance2.sim
tests/script/unique/cluster/balance2.sim
+1
-1
未找到文件。
.travis.yml
浏览文件 @
364c1289
...
...
@@ -135,92 +135,6 @@ matrix:
# https://scan.coverity.com/faq#frequency
branch_pattern
:
coverity_scan
-
os
:
linux
dist
:
bionic
language
:
c
compiler
:
gcc
env
:
ENV_COVER=true
git
:
-
depth
:
1
addons
:
apt
:
packages
:
-
build-essential
-
cmake
-
net-tools
-
python-pip
-
python-setuptools
-
python3-pip
-
python3-setuptools
-
lcov
-
psmisc
before_script
:
-
cd ${TRAVIS_BUILD_DIR}
-
mkdir debug
-
cd debug
script
:
-
cmake -DCOVER=true .. > /dev/null
-
make > /dev/null
after_success
:
-
|-
case $TRAVIS_OS_NAME in
linux)
cd ${TRAVIS_BUILD_DIR}/debug
make install > /dev/null || travis_terminate $?
pip install numpy
pip install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python2/
pip3 install numpy
pip3 install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python3/
cd ${TRAVIS_BUILD_DIR}/tests
./test-all.sh smoke COVER
TEST_RESULT=$?
pkill taosd
sleep 1
cd ${TRAVIS_BUILD_DIR}
lcov -d . --capture --rc lcov_branch_coverage=1 -o coverage.info
lcov --remove coverage.info '*/tests/*' '*/test/*' '*/deps/*' '*/plugins/*' -o coverage.info
lcov -l --rc lcov_branch_coverage=1 coverage.info || travis_terminate $?
gem install coveralls-lcov
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
coveralls-lcov coverage.info
if [ "$?" -eq "0" ]; then
echo -e "${GREEN} ## Uploaded to Coveralls.io! ## ${NC}"
else
echo -e "${RED} ## Coveralls.io not collect coverage report! ## ${NC} "
fi
bash <(curl -s https://codecov.io/bash) -y .codecov.yml -f coverage.info
if [ "$?" -eq "0" ]; then
echo -e "${GREEN} ## Uploaded to Codecov! ## ${NC} "
else
echo -e "${RED} ## Codecov did not collect coverage report! ## ${NC} "
fi
if [ "$TEST_RESULT" -ne "0" ]; then
travis_terminate $?
fi
;;
esac
-
os
:
linux
dist
:
trusty
language
:
c
...
...
README.md
浏览文件 @
364c1289
[
![Build Status
](
https://travis-ci.org/taosdata/TDengine.svg?branch=master
)
](https://travis-ci.org/taosdata/TDengine)
[
![Build status
](
https://ci.appveyor.com/api/projects/status/kf3pwh2or5afsgl9/branch/master?svg=true
)
](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
[
![Coverage Status
](
https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=develop
)
](https://coveralls.io/github/taosdata/TDengine?branch=develop)
[
![TDengine
](
TDenginelogo.png
)
](https://www.taosdata.com)
...
...
src/client/src/tscParseInsert.c
浏览文件 @
364c1289
...
...
@@ -46,18 +46,20 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
return
TK_ILLEGAL
;
}
int32_t
radix
=
10
;
int32_t
radixList
[
3
]
=
{
16
,
8
,
2
};
// the integer number with different radix: hex, oct, bin
if
(
pToken
->
type
==
TK_HEX
||
pToken
->
type
==
TK_OCT
||
pToken
->
type
==
TK_BIN
)
{
radix
=
radixList
[
pToken
->
type
-
TK_HEX
];
}
errno
=
0
;
*
value
=
strtoll
(
pToken
->
z
,
endPtr
,
radix
);
*
value
=
strtoll
(
pToken
->
z
,
endPtr
,
0
);
if
(
**
endPtr
==
'e'
||
**
endPtr
==
'E'
||
**
endPtr
==
'.'
)
{
errno
=
0
;
double
v
=
round
(
strtod
(
pToken
->
z
,
endPtr
));
if
(
v
>
INT64_MAX
||
v
<=
INT64_MIN
)
{
errno
=
ERANGE
;
}
else
{
*
value
=
v
;
}
}
// not a valid integer number, return error
if
(
(
pToken
->
type
==
TK_STRING
||
pToken
->
type
==
TK_ID
)
&&
((
*
endPtr
-
pToken
->
z
)
!=
pToken
->
n
)
)
{
if
(
*
endPtr
-
pToken
->
z
!=
pToken
->
n
)
{
return
TK_ILLEGAL
;
}
...
...
@@ -73,11 +75,11 @@ static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
*
value
=
strtod
(
pToken
->
z
,
endPtr
);
// not a valid integer number, return error
if
((
pToken
->
type
==
TK_STRING
||
pToken
->
type
==
TK_ID
)
&&
((
*
endPtr
-
pToken
->
z
)
!=
pToken
->
n
)
)
{
if
((
*
endPtr
-
pToken
->
z
)
!=
pToken
->
n
)
{
return
TK_ILLEGAL
;
}
else
{
return
pToken
->
type
;
}
return
pToken
->
type
;
}
int
tsParseTime
(
SSQLToken
*
pToken
,
int64_t
*
time
,
char
**
next
,
char
*
error
,
int16_t
timePrec
)
{
...
...
@@ -986,14 +988,14 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
code
;
}
int
validateTableName
(
char
*
tblName
,
int
len
)
{
char
buf
[
TSDB_TABLE_ID_LEN
]
=
{
0
};
tstrncpy
(
buf
,
tblName
,
sizeof
(
buf
));
int
validateTableName
(
char
*
tblName
,
int
len
,
SSQLToken
*
psTblToken
)
{
tstrncpy
(
psTblToken
->
z
,
tblName
,
TSDB_TABLE_ID_LEN
);
SSQLToken
token
=
{.
n
=
len
,
.
type
=
TK_ID
,
.
z
=
buf
};
tSQLGetToken
(
buf
,
&
token
.
type
);
psTblToken
->
n
=
len
;
psTblToken
->
type
=
TK_ID
;
tSQLGetToken
(
psTblToken
->
z
,
&
psTblToken
->
type
);
return
tscValidateName
(
&
t
oken
);
return
tscValidateName
(
psTblT
oken
);
}
static
int32_t
validateDataSource
(
SSqlCmd
*
pCmd
,
int8_t
type
,
const
char
*
sql
)
{
...
...
@@ -1076,14 +1078,16 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
pCmd
->
curSql
=
sToken
.
z
;
char
buf
[
TSDB_TABLE_ID_LEN
];
SSQLToken
sTblToken
;
sTblToken
.
z
=
buf
;
// Check if the table name available or not
if
(
validateTableName
(
sToken
.
z
,
sToken
.
n
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
validateTableName
(
sToken
.
z
,
sToken
.
n
,
&
sTblToken
)
!=
TSDB_CODE_SUCCESS
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"table name invalid"
,
sToken
.
z
);
goto
_error
;
}
if
((
code
=
tscSetTableFullName
(
pTableMetaInfo
,
&
sToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tscSetTableFullName
(
pTableMetaInfo
,
&
sT
blT
oken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
src/inc/ttokendef.h
浏览文件 @
364c1289
...
...
@@ -221,6 +221,16 @@
#define TK_INTO 203
#define TK_VALUES 204
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_ILLEGAL 302
#define TK_HEX 303 // hex number 0x123
#define TK_OCT 304 // oct number
#define TK_BIN 305 // bin format data 0b111
#define TK_FILE 306
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
#endif
src/mnode/src/mnodeAcct.c
浏览文件 @
364c1289
...
...
@@ -126,8 +126,8 @@ int32_t mnodeInitAccts() {
}
void
mnodeCleanupAccts
()
{
sdbCloseTable
(
tsAcctSdb
);
acctCleanUp
();
sdbCloseTable
(
tsAcctSdb
);
}
void
*
mnodeGetAcct
(
char
*
name
)
{
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
364c1289
...
...
@@ -348,7 +348,6 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
pRsp
->
dnodeCfg
.
dnodeId
=
htonl
(
pDnode
->
dnodeId
);
pRsp
->
dnodeCfg
.
moduleStatus
=
htonl
((
int32_t
)
pDnode
->
isMgmt
);
pRsp
->
dnodeCfg
.
numOfVnodes
=
htonl
(
openVnodes
);
mnodeGetMnodeInfos
(
&
pRsp
->
mnodes
);
SDMVgroupAccess
*
pAccess
=
(
SDMVgroupAccess
*
)((
char
*
)
pRsp
+
sizeof
(
SDMStatusRsp
));
for
(
int32_t
j
=
0
;
j
<
openVnodes
;
++
j
)
{
...
...
@@ -392,6 +391,10 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
}
pDnode
->
lastAccess
=
tsAccessSquence
;
//this func should be called after sdb replica changed
mnodeGetMnodeInfos
(
&
pRsp
->
mnodes
);
mnodeDecDnodeRef
(
pDnode
);
pMsg
->
rpcRsp
.
len
=
contLen
;
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
364c1289
...
...
@@ -678,8 +678,9 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
mnodeMsg
->
received
++
;
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
mnodeMsg
->
code
=
rpcMsg
->
code
;
mnodeMsg
->
successed
++
;
}
else
{
mnodeMsg
->
code
=
rpcMsg
->
code
;
}
SVgObj
*
pVgroup
=
mnodeMsg
->
pVgroup
;
...
...
@@ -702,7 +703,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
code
=
TSDB_CODE_MND_SDB_ERROR
;
}
dnodeSendRpcMnodeWriteRsp
(
mnodeMsg
,
code
);
dnodeSendRpcMnodeWriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
}
}
...
...
src/os/linux/src/linuxPlatform.c
浏览文件 @
364c1289
...
...
@@ -56,70 +56,8 @@ void taosMsleep(int mseconds) {
bool
taosCheckPthreadValid
(
pthread_t
thread
)
{
return
thread
!=
0
;
}
void
taosResetPthread
(
pthread_t
*
thread
)
{
*
thread
=
0
;
}
int64_t
taosGetPthreadId
()
{
return
(
int64_t
)
pthread_self
();
}
/*
* Function to get the private ip address of current machine. If get IP
* successfully, return 0, else, return -1. The return values is ip.
*
* Use:
* if (taosGetPrivateIp(ip) != 0) {
* perror("Fail to get private IP address\n");
* exit(EXIT_FAILURE);
* }
*/
int
taosGetPrivateIp
(
char
*
const
ip
)
{
bool
hasLoCard
=
false
;
struct
ifaddrs
*
ifaddr
,
*
ifa
;
int
family
,
s
;
char
host
[
NI_MAXHOST
];
if
(
getifaddrs
(
&
ifaddr
)
==
-
1
)
{
return
-
1
;
}
/* Walk through linked list, maintaining head pointer so we can free list later */
int
flag
=
0
;
for
(
ifa
=
ifaddr
;
ifa
!=
NULL
;
ifa
=
ifa
->
ifa_next
)
{
if
(
ifa
->
ifa_addr
==
NULL
)
continue
;
family
=
ifa
->
ifa_addr
->
sa_family
;
if
(
strcmp
(
"lo"
,
ifa
->
ifa_name
)
==
0
)
{
hasLoCard
=
true
;
continue
;
}
if
(
family
==
AF_INET
)
{
/* printf("%-8s", ifa->ifa_name); */
s
=
getnameinfo
(
ifa
->
ifa_addr
,
(
family
==
AF_INET
)
?
sizeof
(
struct
sockaddr_in
)
:
sizeof
(
struct
sockaddr_in6
),
host
,
NI_MAXHOST
,
NULL
,
0
,
NI_NUMERICHOST
);
if
(
s
!=
0
)
{
freeifaddrs
(
ifaddr
);
return
-
1
;
}
strcpy
(
ip
,
host
);
flag
=
1
;
break
;
}
}
freeifaddrs
(
ifaddr
);
if
(
flag
)
{
return
0
;
}
else
{
if
(
hasLoCard
)
{
uPrint
(
"no net card was found, use lo:127.0.0.1 as default"
);
strcpy
(
ip
,
"127.0.0.1"
);
return
0
;
}
return
-
1
;
}
}
int
taosSetNonblocking
(
int
sock
,
int
on
)
{
int
flags
=
0
;
if
((
flags
=
fcntl
(
sock
,
F_GETFL
,
0
))
<
0
)
{
...
...
@@ -294,21 +232,6 @@ ssize_t twrite(int fd, void *buf, size_t n) {
return
n
;
}
bool
taosSkipSocketCheck
()
{
struct
utsname
buf
;
if
(
uname
(
&
buf
))
{
uPrint
(
"can't fetch os info"
);
return
false
;
}
if
(
strstr
(
buf
.
release
,
"Microsoft"
)
!=
0
)
{
uPrint
(
"using WSLv1"
);
return
true
;
}
return
false
;
}
void
taosBlockSIGPIPE
()
{
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
...
...
src/query/src/qtokenizer.c
浏览文件 @
364c1289
...
...
@@ -25,7 +25,7 @@
// All the keywords of the SQL language are stored in a hash table
typedef
struct
SKeyword
{
const
char
*
name
;
// The keyword name
uint
8
_t
type
;
// type
uint
16
_t
type
;
// type
uint8_t
len
;
// length
}
SKeyword
;
...
...
src/rpc/inc/rpcTcp.h
浏览文件 @
364c1289
...
...
@@ -21,9 +21,11 @@ extern "C" {
#endif
void
*
taosInitTcpServer
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpServer
(
void
*
param
);
void
taosCleanUpTcpServer
(
void
*
param
);
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpClient
(
void
*
chandle
);
void
taosCleanUpTcpClient
(
void
*
chandle
);
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
...
...
src/rpc/inc/rpcUdp.h
浏览文件 @
364c1289
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include "taosdef.h"
void
*
taosInitUdpConnection
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
taosStopUdpConnection
(
void
*
handle
);
void
taosCleanUpUdpConnection
(
void
*
handle
);
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
);
void
*
taosOpenUdpConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
...
...
src/rpc/src/rpcMain.c
浏览文件 @
364c1289
...
...
@@ -153,6 +153,13 @@ void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpTcpClient
};
void
(
*
taosStopConn
[])(
void
*
thandle
)
=
{
taosStopUdpConnection
,
taosStopUdpConnection
,
taosStopTcpServer
,
taosStopTcpClient
,
};
int
(
*
taosSendData
[])(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
=
{
taosSendUdpData
,
taosSendUdpData
,
...
...
@@ -289,12 +296,18 @@ void *rpcOpen(const SRpcInit *pInit) {
void
rpcClose
(
void
*
param
)
{
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
param
;
// stop connection to outside first
(
*
taosStopConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
tcphandle
);
(
*
taosStopConn
[
pRpc
->
connType
])(
pRpc
->
udphandle
);
// close all connections
for
(
int
i
=
0
;
i
<
pRpc
->
sessions
;
++
i
)
{
if
(
pRpc
->
connList
&&
pRpc
->
connList
[
i
].
user
[
0
])
{
rpcCloseConn
((
void
*
)(
pRpc
->
connList
+
i
));
}
}
// clean up
(
*
taosCleanUpConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
tcphandle
);
(
*
taosCleanUpConn
[
pRpc
->
connType
])(
pRpc
->
udphandle
);
...
...
@@ -588,6 +601,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
pConn
->
inTranId
=
0
;
pConn
->
outTranId
=
0
;
pConn
->
secured
=
0
;
pConn
->
peerId
=
0
;
pConn
->
peerIp
=
0
;
pConn
->
peerPort
=
0
;
pConn
->
pReqMsg
=
NULL
;
...
...
@@ -627,6 +641,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn
->
spi
=
pRpc
->
spi
;
pConn
->
encrypt
=
pRpc
->
encrypt
;
if
(
pConn
->
spi
)
memcpy
(
pConn
->
secret
,
pRpc
->
secret
,
TSDB_KEY_LEN
);
tTrace
(
"%s %p client connection is allocated"
,
pRpc
->
label
,
pConn
);
}
return
pConn
;
...
...
@@ -681,6 +696,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
taosHashPut
(
pRpc
->
hash
,
hashstr
,
size
,
(
char
*
)
&
pConn
,
POINTER_BYTES
);
tTrace
(
"%s %p server connection is allocated"
,
pRpc
->
label
,
pConn
);
}
return
pConn
;
...
...
@@ -948,11 +964,9 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
terrno
=
0
;
pConn
=
rpcProcessMsgHead
(
pRpc
,
pRecv
);
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
{
tTrace
(
"%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x"
,
tTrace
(
"%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x"
,
pRpc
->
label
,
pConn
,
(
void
*
)
pHead
->
ahandle
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
ip
,
pRecv
->
port
,
terrno
,
pRecv
->
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
,
pHead
->
code
);
}
int32_t
code
=
terrno
;
if
(
code
!=
TSDB_CODE_RPC_ALREADY_PROCESSED
)
{
...
...
@@ -1180,16 +1194,14 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
msgLen
=
rpcAddAuthPart
(
pConn
,
msg
,
msgLen
);
if
(
rpcIsReq
(
pHead
->
msgType
))
{
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
tTrace
(
"%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
else
{
if
(
pHead
->
code
==
0
)
pConn
->
secured
=
1
;
// for success response, set link as secured
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_CM_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIp
,
pConn
->
peerPort
,
htonl
(
pHead
->
code
),
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
tTrace
(
"%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIp
,
pConn
->
peerPort
,
htonl
(
pHead
->
code
),
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
//tTrace("connection type is: %d", pConn->connType);
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
364c1289
...
...
@@ -190,22 +190,28 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
void
taosCleanUpTcpServer
(
void
*
handle
)
{
void
taosStopTcpServer
(
void
*
handle
)
{
SServerObj
*
pServerObj
=
handle
;
SThreadObj
*
pThreadObj
;
if
(
pServerObj
==
NULL
)
return
;
if
(
pServerObj
->
fd
>=
0
)
shutdown
(
pServerObj
->
fd
,
SHUT_RD
);
if
(
pServerObj
->
thread
)
pthread_join
(
pServerObj
->
thread
,
NULL
);
tTrace
(
"%s TCP server is stopped"
,
pServerObj
->
label
);
}
void
taosCleanUpTcpServer
(
void
*
handle
)
{
SServerObj
*
pServerObj
=
handle
;
SThreadObj
*
pThreadObj
;
if
(
pServerObj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pServerObj
->
numOfThreads
;
++
i
)
{
pThreadObj
=
pServerObj
->
pThreadObj
+
i
;
taosStopTcpThread
(
pThreadObj
);
pthread_mutex_destroy
(
&
(
pThreadObj
->
mutex
));
}
tTrace
(
"
TCP:%s,
TCP server is cleaned up"
,
pServerObj
->
label
);
tTrace
(
"
%s
TCP server is cleaned up"
,
pServerObj
->
label
);
tfree
(
pServerObj
->
pThreadObj
);
tfree
(
pServerObj
);
...
...
@@ -226,7 +232,7 @@ static void *taosAcceptTcpConnection(void *arg) {
connFd
=
accept
(
pServerObj
->
fd
,
(
struct
sockaddr
*
)
&
caddr
,
&
addrlen
);
if
(
connFd
==
-
1
)
{
if
(
errno
==
EINVAL
)
{
tTrace
(
"%s TCP server s
ocket was shutdown, exiting...
"
,
pServerObj
->
label
);
tTrace
(
"%s TCP server s
top accepting new connections, exiting
"
,
pServerObj
->
label
);
break
;
}
...
...
@@ -242,13 +248,13 @@ static void *taosAcceptTcpConnection(void *arg) {
SFdObj
*
pFdObj
=
taosMallocFdObj
(
pThreadObj
,
connFd
);
if
(
pFdObj
)
{
pFdObj
->
ip
=
caddr
.
sin_addr
.
s_addr
;
pFdObj
->
port
=
caddr
.
sin_port
;
tTrace
(
"%s new
connection from %s:%hu, FD:%p,
numOfFds:%d"
,
pServerObj
->
label
,
inet_ntoa
(
caddr
.
sin_addr
),
pFdObj
->
port
,
pFdObj
,
pThreadObj
->
numOfFds
);
pFdObj
->
port
=
htons
(
caddr
.
sin_port
)
;
tTrace
(
"%s new
TCP connection from %s:%hu, fd:%d FD:%p
numOfFds:%d"
,
pServerObj
->
label
,
inet_ntoa
(
caddr
.
sin_addr
),
pFdObj
->
port
,
connFd
,
pFdObj
,
pThreadObj
->
numOfFds
);
}
else
{
close
(
connFd
);
tError
(
"%s failed to malloc FdObj(%s) for connection from:%s:%hu"
,
pServerObj
->
label
,
strerror
(
errno
),
inet_ntoa
(
caddr
.
sin_addr
),
caddr
.
sin_port
);
inet_ntoa
(
caddr
.
sin_addr
),
htons
(
caddr
.
sin_port
)
);
}
// pick up next thread for next connection
...
...
@@ -304,12 +310,19 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return
pThreadObj
;
}
void
taosStopTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
tTrace
(
"%s TCP client is stopped"
,
pThreadObj
->
label
);
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
taosStopTcpThread
(
pThreadObj
);
tTrace
(
"%s
, all connections are
cleaned up"
,
pThreadObj
->
label
);
tTrace
(
"%s
TCP client is
cleaned up"
,
pThreadObj
->
label
);
tfree
(
pThreadObj
);
}
...
...
@@ -320,14 +333,22 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
int
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pThreadObj
->
ip
);
if
(
fd
<
0
)
return
NULL
;
struct
sockaddr_in
sin
;
uint16_t
localPort
=
0
;
unsigned
int
addrlen
=
sizeof
(
sin
);
if
(
getsockname
(
fd
,
(
struct
sockaddr
*
)
&
sin
,
&
addrlen
)
==
0
&&
sin
.
sin_family
==
AF_INET
&&
addrlen
==
sizeof
(
sin
))
{
localPort
=
(
uint16_t
)
ntohs
(
sin
.
sin_port
);
}
SFdObj
*
pFdObj
=
taosMallocFdObj
(
pThreadObj
,
fd
);
if
(
pFdObj
)
{
pFdObj
->
thandle
=
thandle
;
pFdObj
->
port
=
port
;
pFdObj
->
ip
=
ip
;
tTrace
(
"%s %p
, TCP connection to 0x%x:%hu is created,
FD:%p numOfFds:%d"
,
pThreadObj
->
label
,
thandle
,
ip
,
port
,
pFdObj
,
pThreadObj
->
numOfFds
);
tTrace
(
"%s %p
TCP connection to 0x%x:%hu is created, localPort:%hu
FD:%p numOfFds:%d"
,
pThreadObj
->
label
,
thandle
,
ip
,
port
,
localPort
,
pFdObj
,
pThreadObj
->
numOfFds
);
}
else
{
close
(
fd
);
tError
(
"%s failed to malloc client FdObj(%s)"
,
pThreadObj
->
label
,
strerror
(
errno
));
...
...
@@ -340,7 +361,10 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj
*
pFdObj
=
chandle
;
if
(
pFdObj
==
NULL
)
return
;
pFdObj
->
thandle
=
NULL
;
SThreadObj
*
pThreadObj
=
pFdObj
->
pThreadObj
;
tTrace
(
"%s %p TCP connection will be closed, FD:%p"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
);
// pFdObj->thandle = NULL;
pFdObj
->
closedByApp
=
1
;
shutdown
(
pFdObj
->
fd
,
SHUT_WR
);
}
...
...
@@ -385,14 +409,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
headLen
=
taosReadMsg
(
pFdObj
->
fd
,
&
rpcHead
,
sizeof
(
SRpcHead
));
if
(
headLen
!=
sizeof
(
SRpcHead
))
{
tTrace
(
"%s %p
,
read error, headLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
headLen
);
tTrace
(
"%s %p read error, headLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
headLen
);
return
-
1
;
}
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
rpcHead
.
msgLen
);
buffer
=
malloc
(
msgLen
+
tsRpcOverhead
);
if
(
NULL
==
buffer
)
{
tError
(
"%s %p
,
TCP malloc(size:%d) fail"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
msgLen
);
tError
(
"%s %p TCP malloc(size:%d) fail"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
msgLen
);
return
-
1
;
}
...
...
@@ -401,8 +425,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen
=
taosReadMsg
(
pFdObj
->
fd
,
msg
+
headLen
,
leftLen
);
if
(
leftLen
!=
retLen
)
{
tError
(
"%s %p
, read error, leftLen:%d retLen:%d
"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
leftLen
,
retLen
);
tError
(
"%s %p
read error, leftLen:%d retLen:%d FD:%p
"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
leftLen
,
retLen
,
pFdObj
);
free
(
buffer
);
return
-
1
;
}
...
...
@@ -446,19 +470,19 @@ static void *taosProcessTcpData(void *param) {
pFdObj
=
events
[
i
].
data
.
ptr
;
if
(
events
[
i
].
events
&
EPOLLERR
)
{
tTrace
(
"%s %p
, error happened on FD"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
tTrace
(
"%s %p
FD:%p epoll errors"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
if
(
events
[
i
].
events
&
EPOLLRDHUP
)
{
tTrace
(
"%s %p
, FD RD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
tTrace
(
"%s %p
FD:%p RD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
if
(
events
[
i
].
events
&
EPOLLHUP
)
{
tTrace
(
"%s %p
, FD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
tTrace
(
"%s %p
FD:%p hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
...
...
@@ -527,7 +551,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj
->
numOfFds
--
;
if
(
pThreadObj
->
numOfFds
<
0
)
tError
(
"%s %p
,
TCP thread:%d, number of FDs is negative!!!"
,
tError
(
"%s %p TCP thread:%d, number of FDs is negative!!!"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pThreadObj
->
threadId
);
if
(
pFdObj
->
prev
)
{
...
...
@@ -542,7 +566,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock
(
&
pThreadObj
->
mutex
);
tTrace
(
"%s %p
, FD:%p is cleaned,
numOfFds:%d"
,
tTrace
(
"%s %p
TCP connection is closed, FD:%p
numOfFds:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
,
pThreadObj
->
numOfFds
);
tfree
(
pFdObj
);
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
364c1289
...
...
@@ -30,7 +30,6 @@
#define RPC_MAX_UDP_SIZE 65480
typedef
struct
{
void
*
signature
;
int
index
;
int
fd
;
uint16_t
port
;
// peer port
...
...
@@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn
->
processData
=
fp
;
pConn
->
index
=
i
;
pConn
->
pSet
=
pSet
;
pConn
->
signature
=
pConn
;
int
code
=
pthread_create
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
);
if
(
code
!=
0
)
{
...
...
@@ -132,7 +130,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
return
pSet
;
}
void
taos
CleanU
pUdpConnection
(
void
*
handle
)
{
void
taos
Sto
pUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
...
...
@@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) {
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
pConn
->
signature
=
NULL
;
if
(
pConn
->
fd
>=
0
)
shutdown
(
pConn
->
fd
,
SHUT_RDWR
);
if
(
pConn
->
fd
>=
0
)
taosCloseSocket
(
pConn
->
fd
);
}
...
...
@@ -150,9 +146,24 @@ void taosCleanUpUdpConnection(void *handle) {
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
thread
)
pthread_join
(
pConn
->
thread
,
NULL
);
tfree
(
pConn
->
buffer
);
tTrace
(
"%s UDP thread is closed, inedx:%d"
,
pConn
->
label
,
i
);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
}
tTrace
(
"%s UDP is stopped"
,
pSet
->
label
);
}
void
taosCleanUpUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
if
(
pSet
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
fd
>=
0
)
taosCloseSocket
(
pConn
->
fd
);
}
tTrace
(
"%s UDP is cleaned up"
,
pSet
->
label
);
tfree
(
pSet
);
}
...
...
@@ -164,7 +175,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
SUdpConn
*
pConn
=
pSet
->
udpConn
+
pSet
->
index
;
pConn
->
port
=
port
;
tTrace
(
"%s UDP connection is setup, ip:%x:%hu
"
,
pConn
->
label
,
ip
,
p
ort
);
tTrace
(
"%s UDP connection is setup, ip:%x:%hu
localPort:%hu"
,
pConn
->
label
,
ip
,
port
,
pConn
->
localP
ort
);
return
pConn
;
}
...
...
@@ -185,7 +196,7 @@ static void *taosRecvUdpData(void *param) {
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
if
(
dataLen
<=
0
)
{
tTrace
(
"%s UDP socket was closed, exiting
"
,
pConn
->
label
);
tTrace
(
"%s UDP socket was closed, exiting
(%s)"
,
pConn
->
label
,
strerror
(
errno
)
);
break
;
}
...
...
@@ -221,7 +232,7 @@ static void *taosRecvUdpData(void *param) {
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
if
(
pConn
==
NULL
||
pConn
->
signature
!=
pConn
)
return
-
1
;
if
(
pConn
==
NULL
)
return
-
1
;
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
...
...
src/tsdb/src/tsdbBuffer.c
浏览文件 @
364c1289
...
...
@@ -124,6 +124,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
}
SListNode
*
pNode
=
tdListPopHead
(
pBufPool
->
bufBlockList
);
ASSERT
(
pNode
!=
NULL
);
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pBufPool
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
364c1289
...
...
@@ -358,7 +358,9 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT
(
pFileH
->
nFGroups
>=
0
);
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
remove
(
fileGroup
.
files
[
type
].
fname
);
if
(
remove
(
fileGroup
.
files
[
type
].
fname
)
<
0
)
{
tsdbError
(
"vgId:%d failed to remove file %s"
,
REPO_ID
(
pRepo
),
fileGroup
.
files
[
type
].
fname
);
}
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
}
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
364c1289
...
...
@@ -213,10 +213,10 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
SFileGroup
*
pFGroup
=
taosbsearch
(
&
fid
,
pFileH
->
pFGroup
,
pFileH
->
nFGroups
,
sizeof
(
SFileGroup
),
keyFGroupCompFunc
,
TD_GE
);
if
(
pFGroup
->
fileId
==
fid
)
{
strcpy
(
fname
,
pFGroup
->
files
[(
*
index
)
%
3
].
fname
);
fname
=
strdup
(
pFGroup
->
files
[(
*
index
)
%
3
].
fname
);
}
else
{
if
(
pFGroup
->
fileId
*
3
+
2
<
eindex
)
{
strcpy
(
fname
,
pFGroup
->
files
[
0
].
fname
);
fname
=
strdup
(
pFGroup
->
files
[
0
].
fname
);
*
index
=
pFGroup
->
fileId
*
3
;
}
else
{
tfree
(
sdup
);
...
...
@@ -237,12 +237,13 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
}
SFile
*
pFile
=
&
pFGroup
->
files
[(
*
index
)
%
3
];
strcpy
(
fname
,
pFile
->
fname
);
fname
=
strdup
(
pFile
->
fname
);
}
}
if
(
stat
(
fname
,
&
fState
)
<
0
)
{
tfree
(
sdup
);
tfree
(
fname
);
return
0
;
}
...
...
@@ -566,7 +567,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
_err:
tfree
(
fname
);
if
(
fd
>
0
)
close
(
fd
);
if
(
fd
>
=
0
)
close
(
fd
);
return
-
1
;
}
...
...
@@ -609,7 +610,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
_err:
tfree
(
fname
);
if
(
fd
>
0
)
close
(
fd
);
if
(
fd
>
=
0
)
close
(
fd
);
return
-
1
;
}
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
364c1289
...
...
@@ -99,6 +99,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
if
(
tSkipListPut
(
pTableData
->
pData
,
pNode
)
==
NULL
)
{
tsdbFreeBytes
(
pRepo
,
(
void
*
)
pNode
,
bytes
);
}
else
{
if
(
TABLE_LASTKEY
(
pTable
)
<
key
)
TABLE_LASTKEY
(
pTable
)
=
key
;
if
(
pMemTable
->
keyFirst
>
key
)
pMemTable
->
keyFirst
=
key
;
if
(
pMemTable
->
keyLast
<
key
)
pMemTable
->
keyLast
=
key
;
pMemTable
->
numOfRows
++
;
...
...
@@ -222,11 +223,12 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pRepo
->
commit
=
0
;
}
ASSERT
(
pRepo
->
commit
==
0
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
if
(
pRepo
->
mem
!=
NULL
)
{
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
pRepo
->
imem
=
pRepo
->
mem
;
pRepo
->
mem
=
NULL
;
...
...
@@ -468,9 +470,6 @@ _err:
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
->
commit
==
1
);
tsdbLockRepo
(
pRepo
);
pRepo
->
commit
=
0
;
tsdbUnlockRepo
(
pRepo
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
);
}
...
...
@@ -526,8 +525,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
goto
_err
;
}
free
(
dataDir
);
// Open files for write/read
if
(
tsdbSetAndOpenHelperFile
(
pHelper
,
pGroup
)
<
0
)
{
tsdbError
(
"vgId:%d failed to set helper file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
...
...
@@ -590,6 +587,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
goto
_err
;
}
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
0
);
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
...
...
@@ -601,7 +599,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return
0
;
_err:
// ASSERT(false
);
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
1
);
return
-
1
;
}
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
364c1289
...
...
@@ -147,6 +147,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
tsdbInsertTableAct
(
pRepo
,
TSDB_DROP_META
,
buf
,
tTable
);
tsdbRemoveTableFromMeta
(
pRepo
,
tTable
,
false
,
true
);
}
tSkipListDestroyIter
(
pIter
);
}
tsdbRemoveTableFromMeta
(
pRepo
,
pTable
,
true
,
true
);
...
...
@@ -269,7 +270,6 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
_err:
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
tsdbClearTableCfg
(
pCfg
);
tfree
(
pCfg
);
return
NULL
;
}
...
...
@@ -308,6 +308,7 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
int32_t
code
=
tsdbUpdateTable
(
pRepo
,
super
,
pTableCfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbClearTableCfg
(
pTableCfg
);
return
code
;
}
tsdbClearTableCfg
(
pTableCfg
);
...
...
src/util/inc/tstoken.h
浏览文件 @
364c1289
...
...
@@ -24,14 +24,7 @@ extern "C" {
#include "tutil.h"
#include "ttokendef.h"
#define TK_SPACE 200
#define TK_COMMENT 201
#define TK_ILLEGAL 202
#define TK_HEX 203 // hex number 0x123
#define TK_OCT 204 // oct number
#define TK_BIN 205 // bin format data 0b111
#define TK_FILE 206
#define TK_QUESTION 207 // denoting the placeholder of "?",when invoking statement bind query
#define TSQL_TBNAME "TBNAME"
#define TSQL_TBNAME_L "tbname"
...
...
src/util/src/tsocket.c
浏览文件 @
364c1289
...
...
@@ -222,9 +222,7 @@ int taosReadn(int fd, char *ptr, int nbytes) {
int
taosOpenUdpSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
localAddr
;
int
sockFd
;
int
ttl
=
128
;
int
reuse
,
nocheck
;
int
bufSize
=
8192000
;
int
bufSize
=
1024000
;
uTrace
(
"open udp socket:0x%x:%hu"
,
ip
,
port
);
...
...
@@ -238,31 +236,6 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
return
-
1
;
}
reuse
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
uError
(
"setsockopt SO_REUSEADDR failed): %d (%s)"
,
errno
,
strerror
(
errno
));
close
(
sockFd
);
return
-
1
;
};
nocheck
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_NO_CHECK
,
(
void
*
)
&
nocheck
,
sizeof
(
nocheck
))
<
0
)
{
if
(
!
taosSkipSocketCheck
())
{
uError
(
"setsockopt SO_NO_CHECK failed: %d (%s)"
,
errno
,
strerror
(
errno
));
close
(
sockFd
);
return
-
1
;
}
else
{
uPrint
(
"Skipping setsockopt SO_NO_CHECK error: %d (%s)"
,
errno
,
strerror
(
errno
));
}
}
ttl
=
128
;
if
(
taosSetSockOpt
(
sockFd
,
IPPROTO_IP
,
IP_TTL
,
(
void
*
)
&
ttl
,
sizeof
(
ttl
))
<
0
)
{
uError
(
"setsockopt IP_TTL failed: %d (%s)"
,
errno
,
strerror
(
errno
));
close
(
sockFd
);
return
-
1
;
}
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_SNDBUF
,
(
void
*
)
&
bufSize
,
sizeof
(
bufSize
))
!=
0
)
{
uError
(
"failed to set the send buffer size for UDP socket
\n
"
);
close
(
sockFd
);
...
...
tests/pytest/table/boundary.py
浏览文件 @
364c1289
...
...
@@ -141,7 +141,7 @@ class TDTestCase:
tdSql
.
prepare
()
# 8 bytes for timestamp
maxRowSize
=
65535
-
8
maxRowSize
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_BYTES_PER_ROW'
)
-
8
maxCols
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_COLUMNS'
)
-
1
# for binary cols, 2 bytes are used for length
...
...
tests/script/general/stable/refcount.sim
0 → 100644
浏览文件 @
364c1289
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print =============== step1
sql create database d1;
sql use d1;
sql create table d1.t1 (ts timestamp, i int);
sql create table d1.t2 (ts timestamp, i int);
sql create table d1.t3 (ts timestamp, i int);
sql insert into d1.t1 values(now, 1);
sql insert into d1.t2 values(now, 1);
sql drop table d1.t1;
sql drop database d1;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step2
sql create database d2;
sql use d2;
sql create table d2.t1 (ts timestamp, i int);
sql create table d2.t2 (ts timestamp, i int);
sql create table d2.t3 (ts timestamp, i int);
sql insert into d2.t1 values(now, 1);
sql insert into d2.t2 values(now, 1);
sql drop table d2.t1;
sql drop table d2.t2;
sql drop table d2.t3;
sql show d2.tables;
if $rows != 0 then
return -1
endi
sql show d2.vgroups;
if $rows != 0 then
return -1
endi
sql drop database d2;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step3
sql create database d3;
sql use d3;
sql create table d3.st (ts timestamp, i int) tags (j int);
sql create table d3.t1 using d3.st tags(1);
sql create table d3.t2 using d3.st tags(1);
sql create table d3.t3 using d3.st tags(1);
sql insert into d3.t1 values(now, 1);
sql drop table d3.t1;
sql drop table d3.t2;
sql drop table d3.t3;
sql show d3.tables;
if $rows != 0 then
return -1
endi
sql show d3.vgroups;
if $rows != 0 then
return -1
endi
sql drop database d3;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step4
sql create database d4;
sql use d4;
sql create table d4.st (ts timestamp, i int) tags (j int);
sql create table d4.t1 using d4.st tags(1);
sql create table d4.t2 using d4.st tags(1);
sql create table d4.t3 using d4.st tags(1);
sql insert into d4.t1 values(now, 1);
sql drop table d4.t1;
sql drop table d4.st;
sql show d4.tables;
if $rows != 0 then
return -1
endi
sql show d4.stables;
if $rows != 0 then
return -1
endi
sql drop database d4;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step5
sql create database d5;
sql use d5;
sql create table d5.st (ts timestamp, i int) tags (j int);
sql create table d5.t1 using d5.st tags(1);
sql create table d5.t2 using d5.st tags(1);
sql create table d5.t3 using d5.st tags(1);
sql insert into d5.t1 values(now, 1);
sql drop table d5.t1;
sql drop database d5;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step6
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/jenkins/unique.txt
浏览文件 @
364c1289
...
...
@@ -35,6 +35,7 @@ cd ../../../debug; make
./test.sh -f unique/db/replica_reduce31.sim
./test.sh -f unique/db/replica_part.sim
./test.sh -f unique/dnode/alternativeRole.sim
./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/dnode/balance3.sim
...
...
@@ -75,3 +76,59 @@ cd ../../../debug; make
./test.sh -f unique/vnode/replica3_basic.sim
./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim
./test.sh -f general/stream/metrics_1.sim
./test.sh -f general/stream/metrics_del.sim
./test.sh -f general/stream/metrics_n.sim
./test.sh -f general/stream/metrics_replica1_vnoden.sim
#./test.sh -f general/stream/new_stream.sim
./test.sh -f general/stream/restart_stream.sim
./test.sh -f general/stream/stream_1.sim
./test.sh -f general/stream/stream_2.sim
./test.sh -f general/stream/stream_3.sim
./test.sh -f general/stream/stream_restart.sim
./test.sh -f general/stream/table_1.sim
./test.sh -f general/stream/table_del.sim
./test.sh -f general/stream/table_n.sim
./test.sh -f general/stream/table_replica1_vnoden.sim
./test.sh -f unique/arbitrator/check_cluster_cfg_para.sim
./test.sh -f unique/arbitrator/dn2_mn1_cache_file_sync.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_createTableFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_dropDnodeFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_multiCreateDropTable.sim
./test.sh -f unique/arbitrator/dn3_mn1_nw_disable_timeout_autoDropDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica2_wal1_AddDelDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r3_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim
./test.sh -f unique/arbitrator/dn3_mn2_killDnode.sim
./test.sh -f unique/arbitrator/insert_duplicationTs.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim
./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
tests/script/loop.sh
0 → 100755
浏览文件 @
364c1289
#!/bin/bash
##################################################
#
# Do simulation test
#
##################################################
set
-e
#set -x
CMD_NAME
=
LOOP_TIMES
=
5
while
getopts
"f:t:"
arg
do
case
$arg
in
f
)
CMD_NAME
=
$OPTARG
;;
t
)
LOOP_TIMES
=
$OPTARG
;;
?
)
echo
"unknow argument"
;;
esac
done
echo
LOOP_TIMES
${
LOOP_TIMES
}
echo
CMD_NAME
${
CMD_NAME
}
for
((
i
=
0
;
i<
$LOOP_TIMES
;
i++
))
do
echo
loop
$i
echo
cmd
$CMD_NAME
$CMD_NAME
sleep
2
done
tests/script/unique/cluster/balance2.sim
浏览文件 @
364c1289
...
...
@@ -328,7 +328,7 @@ $x = 0
show6:
$x = $x + 1
sleep 2000
if $x ==
3
0 then
if $x ==
1
0 then
return -1
endi
sql show dnodes -x show6
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录