Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
43739e32
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
43739e32
编写于
7月 06, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/develop' into feature/vnode
上级
ac67adc8
1abb0432
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
117 addition
and
44 deletion
+117
-44
cmake/define.inc
cmake/define.inc
+4
-0
cmake/input.inc
cmake/input.inc
+5
-0
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+10
-6
src/inc/vnode.h
src/inc/vnode.h
+2
-0
src/os/linux/inc/os.h
src/os/linux/inc/os.h
+19
-0
src/os/linux/src/linuxPlatform.c
src/os/linux/src/linuxPlatform.c
+46
-0
src/os/linux/src/linuxSysPara.c
src/os/linux/src/linuxSysPara.c
+9
-10
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+7
-2
src/util/src/tfile.c
src/util/src/tfile.c
+4
-4
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+10
-21
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
+1
-1
未找到文件。
cmake/define.inc
浏览文件 @
43739e32
...
...
@@ -28,3 +28,7 @@ ENDIF ()
IF
(
TD_RANDOM_FILE_FAIL
)
ADD_DEFINITIONS
(
-
DTAOS_RANDOM_FILE_FAIL
)
ENDIF
()
IF
(
TD_RANDOM_NETWORK_FAIL
)
ADD_DEFINITIONS
(
-
DTAOS_RANDOM_NETWORK_FAIL
)
ENDIF
()
cmake/input.inc
浏览文件 @
43739e32
...
...
@@ -36,3 +36,8 @@ IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET
(
TD_RANDOM_FILE_FAIL
TRUE
)
MESSAGE
(
STATUS
"build with random-file-fail enabled"
)
ENDIF
()
IF
(
$
{
RANDOM_NETWORK_FAIL
}
MATCHES
"true"
)
SET
(
TD_RANDOM_NETWORK_FAIL
TRUE
)
MESSAGE
(
STATUS
"build with random-network-fail enabled"
)
ENDIF
()
src/dnode/src/dnodeMgmt.c
浏览文件 @
43739e32
...
...
@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() {
}
}
int32_t
code
=
vnodeInitResources
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
dnodeCleanupMgmt
();
return
-
1
;
}
// create the queue and thread to handle the message
tsMgmtQset
=
taosOpenQset
();
if
(
tsMgmtQset
==
NULL
)
{
...
...
@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() {
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
int32_t
code
=
pthread_create
(
&
tsQthread
,
&
thAttr
,
dnodeProcessMgmtQueue
,
NULL
);
code
=
pthread_create
(
&
tsQthread
,
&
thAttr
,
dnodeProcessMgmtQueue
,
NULL
);
pthread_attr_destroy
(
&
thAttr
);
if
(
code
!=
0
)
{
dError
(
"failed to create thread to process mgmt queue, reason:%s"
,
strerror
(
errno
));
...
...
@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) {
}
static
int32_t
dnodeOpenVnodes
()
{
int32_t
*
vnodeList
=
calloc
(
TSDB_MAX_VNODES
,
sizeof
(
int32_t
))
;
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
=
dnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
if
(
status
!=
TSDB_CODE_SUCCESS
)
{
dInfo
(
"get dnode list failed"
);
free
(
vnodeList
);
return
status
;
}
...
...
@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() {
free
(
pThread
->
vnodeList
);
}
free
(
vnodeList
);
free
(
threads
);
dInfo
(
"there are total vnodes:%d, openned:%d failed:%d"
,
numOfVnodes
,
openVnodes
,
failedVnodes
);
...
...
@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() {
}
void
dnodeStartStream
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
];
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
=
vnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
...
...
@@ -359,7 +363,7 @@ void dnodeStartStream() {
}
static
void
dnodeCloseVnodes
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
];
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
;
...
...
src/inc/vnode.h
浏览文件 @
43739e32
...
...
@@ -61,6 +61,8 @@ int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
);
void
vnodeBuildStatusMsg
(
void
*
param
);
void
vnodeSetAccess
(
SDMVgroupAccess
*
pAccess
,
int32_t
numOfVnodes
);
int32_t
vnodeInitResources
();
void
vnodeCleanupResources
();
int32_t
vnodeProcessRead
(
void
*
pVnode
,
SReadMsg
*
pReadMsg
);
...
...
src/os/linux/inc/os.h
浏览文件 @
43739e32
...
...
@@ -86,9 +86,28 @@ extern "C" {
} \
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t
taos_send_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ssize_t
taos_sendto_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taos_read_random_fail
(
int
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taos_write_random_fail
(
int
fd
,
const
void
*
buf
,
size_t
count
);
#define send(sockfd, buf, len, flags) taos_send_random_fail(sockfd, buf, len, flags)
#define sendto(sockfd, buf, len, flags, dest_addr, addrlen) \
taos_sendto_random_fail(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosWriteSocket(fd, buf, len) taos_write_random_fail(fd, buf, len)
#define taosReadSocket(fd, buf, len) taos_read_random_fail(fd, buf, len)
#else
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#endif
/* TAOS_RANDOM_NETWORK_FAIL */
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
...
...
src/os/linux/src/linuxPlatform.c
浏览文件 @
43739e32
...
...
@@ -270,3 +270,49 @@ int tSystem(const char * cmd)
}
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
#define RANDOM_NETWORK_FAIL_FACTOR 20
ssize_t
taos_send_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
send
(
sockfd
,
buf
,
len
,
flags
);
}
ssize_t
taos_sendto_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
sendto
(
sockfd
,
buf
,
len
,
flags
,
dest_addr
,
addrlen
);
}
ssize_t
taos_read_random_fail
(
int
fd
,
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
read
(
fd
,
buf
,
count
);
}
ssize_t
taos_write_random_fail
(
int
fd
,
const
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
EINTR
;
return
-
1
;
}
return
write
(
fd
,
buf
,
count
);
}
#endif
/* TAOS_RANDOM_NETWORK_FAIL */
src/os/linux/src/linuxSysPara.c
浏览文件 @
43739e32
...
...
@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */
FILE
*
f
=
fopen
(
"/etc/timezone"
,
"r"
);
char
buf
[
6
5
]
=
{
0
};
char
buf
[
6
8
]
=
{
0
};
if
(
f
!=
NULL
)
{
int
len
=
fread
(
buf
,
64
,
1
,
f
);
if
(
len
<
64
&&
ferror
(
f
))
{
...
...
@@ -170,7 +170,6 @@ static void taosGetSystemTimezone() {
}
fclose
(
f
);
}
char
*
lineEnd
=
strstr
(
buf
,
"
\n
"
);
if
(
lineEnd
!=
NULL
)
{
...
...
@@ -181,7 +180,7 @@ static void taosGetSystemTimezone() {
if
(
strlen
(
buf
)
>
0
)
{
setenv
(
"TZ"
,
buf
,
1
);
}
}
// get and set default timezone
tzset
();
...
...
src/rpc/src/rpcMain.c
浏览文件 @
43739e32
...
...
@@ -73,6 +73,7 @@ typedef struct {
SRpcInfo
*
pRpc
;
// associated SRpcInfo
SRpcIpSet
ipSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
void
*
signature
;
// for validation
struct
SRpcConn
*
pConn
;
// pConn allocated
char
msgType
;
// message type
uint8_t
*
pCont
;
// content provided by app
...
...
@@ -361,6 +362,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
int
contLen
=
rpcCompressRpcMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
);
pContext
=
(
SRpcReqContext
*
)
(
pMsg
->
pCont
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
));
pContext
->
ahandle
=
pMsg
->
ahandle
;
pContext
->
signature
=
pContext
;
pContext
->
pRpc
=
(
SRpcInfo
*
)
shandle
;
pContext
->
ipSet
=
*
pIpSet
;
pContext
->
contLen
=
contLen
;
...
...
@@ -527,11 +529,13 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return
code
;
}
/* todo: cancel process may have race condition, pContext may have been released
just before app calls the rpcCancelRequest */
void
rpcCancelRequest
(
void
*
handle
)
{
SRpcReqContext
*
pContext
=
handle
;
// signature is used to check if pContext is freed.
// pContext may have been released just before app calls the rpcCancelRequest
if
(
pContext
->
signature
!=
pContext
)
return
;
if
(
pContext
->
pConn
)
{
tDebug
(
"%s, app trys to cancel request"
,
pContext
->
pConn
->
info
);
rpcCloseConn
(
pContext
->
pConn
);
...
...
@@ -1005,6 +1009,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static
void
rpcNotifyClient
(
SRpcReqContext
*
pContext
,
SRpcMsg
*
pMsg
)
{
SRpcInfo
*
pRpc
=
pContext
->
pRpc
;
pContext
->
signature
=
NULL
;
pContext
->
pConn
=
NULL
;
if
(
pContext
->
pRsp
)
{
// for synchronous API
...
...
src/util/src/tfile.c
浏览文件 @
43739e32
...
...
@@ -26,12 +26,12 @@
#include "os.h"
#define RANDOM_FACTOR 5
#define RANDOM_F
ILE_FAIL_F
ACTOR 5
ssize_t
taos_tread
(
int
fd
,
void
*
buf
,
size_t
count
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
@@ -43,7 +43,7 @@ ssize_t taos_tread(int fd, void *buf, size_t count)
ssize_t
taos_twrite
(
int
fd
,
void
*
buf
,
size_t
count
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
@@ -55,7 +55,7 @@ ssize_t taos_twrite(int fd, void *buf, size_t count)
off_t
taos_lseek
(
int
fd
,
off_t
offset
,
int
whence
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
43739e32
...
...
@@ -34,8 +34,7 @@
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
static
int32_t
tsOpennedVnodes
;
static
void
*
tsDnodeVnodesHash
;
static
SHashObj
*
tsDnodeVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeSaveCfg
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
static
int32_t
vnodeReadCfg
(
SVnodeObj
*
pVnode
);
...
...
@@ -47,8 +46,6 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
pthread_once_t
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
#ifndef _SYNC
tsync_h
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
int32_t
syncForwardToPeer
(
tsync_h
shandle
,
void
*
pHead
,
void
*
mhandle
,
int
qtype
)
{
return
0
;
}
...
...
@@ -58,25 +55,28 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void
syncConfirmForward
(
tsync_h
shandle
,
uint64_t
version
,
int32_t
code
)
{}
#endif
static
void
vnodeInit
()
{
int32_t
vnodeInitResources
()
{
vnodeInitWriteFp
();
vnodeInitReadFp
();
tsDnodeVnodesHash
=
taosHashInit
(
TSDB_MAX_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
vError
(
"failed to init vnode list"
);
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
void
vnodeCleanupResources
()
{
if
(
tsDnodeVnodesHash
!=
NULL
)
{
taosHashCleanup
(
tsDnodeVnodesHash
);
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
tsDnodeVnodesHash
=
NULL
;
}
}
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
int32_t
code
;
pthread_once
(
&
vnodeModuleInit
,
vnodeInit
);
SVnodeObj
*
pTemp
=
(
SVnodeObj
*
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
pVnodeCfg
->
cfg
.
vgId
,
sizeof
(
int32_t
));
if
(
pTemp
!=
NULL
)
{
...
...
@@ -144,11 +144,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
int32_t
vnodeDrop
(
int32_t
vgId
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, vgId not exist"
,
vgId
);
return
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, vgId not find"
,
vgId
);
...
...
@@ -187,7 +182,6 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
int32_t
vnodeOpen
(
int32_t
vnode
,
char
*
rootDir
)
{
char
temp
[
TSDB_FILENAME_LEN
];
pthread_once
(
&
vnodeModuleInit
,
vnodeInit
);
SVnodeObj
*
pVnode
=
calloc
(
sizeof
(
SVnodeObj
),
1
);
if
(
pVnode
==
NULL
)
{
...
...
@@ -195,7 +189,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return
TAOS_SYSTEM_ERROR
(
errno
);
}
atomic_add_fetch_32
(
&
tsOpennedVnodes
,
1
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
pVnode
->
vgId
=
vnode
;
...
...
@@ -366,13 +359,11 @@ void vnodeRelease(void *pVnodeRaw) {
free
(
pVnode
);
int32_t
count
=
atomic_sub_fetch_32
(
&
tsOpennedVnodes
,
1
);
int32_t
count
=
taosHashGetSize
(
tsDnodeVnodesHash
);
vDebug
(
"vgId:%d, vnode is released, vnodes:%d"
,
vgId
,
count
);
}
void
*
vnodeGetVnode
(
int32_t
vgId
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
return
NULL
;
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
...
...
@@ -434,8 +425,6 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
}
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
return
TSDB_CODE_SUCCESS
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
tsDnodeVnodesHash
);
while
(
taosHashIterNext
(
pIter
))
{
SVnodeObj
**
pVnode
=
taosHashIterGet
(
pIter
);
...
...
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
浏览文件 @
43739e32
...
...
@@ -94,7 +94,7 @@
<dependency>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<version>
18.0
</version>
<version>
24.1.1
</version>
</dependency>
<dependency>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录