Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a9be7a85
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a9be7a85
编写于
4月 11, 2020
作者:
J
Jeff Tao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
exit nicely
上级
0743d310
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
107 addition
and
51 deletion
+107
-51
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+59
-27
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+21
-10
src/rpc/src/rpcCache.c
src/rpc/src/rpcCache.c
+2
-1
src/rpc/src/rpcClient.c
src/rpc/src/rpcClient.c
+3
-1
src/rpc/src/rpcServer.c
src/rpc/src/rpcServer.c
+3
-4
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+3
-1
src/util/src/ihash.c
src/util/src/ihash.c
+0
-1
src/util/src/tqueue.c
src/util/src/tqueue.c
+12
-5
src/util/src/tsched.c
src/util/src/tsched.c
+2
-0
src/vnode/main/src/vnodeMain.c
src/vnode/main/src/vnodeMain.c
+2
-1
未找到文件。
src/dnode/src/dnodeRead.c
浏览文件 @
a9be7a85
...
...
@@ -32,27 +32,51 @@ typedef struct {
SRpcMsg
rpcMsg
;
}
SReadMsg
;
typedef
struct
{
pthread_t
thread
;
// thread
int32_t
workerId
;
// worker ID
}
SReadWorker
;
typedef
struct
{
int32_t
max
;
// max number of workers
int32_t
min
;
// min number of workers
int32_t
num
;
// current number of workers
SReadWorker
*
readWorker
;
}
SReadWorkerPool
;
static
void
*
dnodeProcessReadQueue
(
void
*
param
);
static
void
dnodeHandleIdleReadWorker
();
static
void
dnodeHandleIdleReadWorker
(
SReadWorker
*
);
// module global variable
static
taos_qset
readQset
;
static
int32_t
threads
;
// number of query threads
static
int32_t
maxThreads
;
static
int32_t
minThreads
;
static
SReadWorkerPool
readPool
;
static
taos_qset
readQset
;
int32_t
dnodeInitRead
()
{
readQset
=
taosOpenQset
();
minThreads
=
3
;
maxThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
if
(
maxThreads
<=
minThreads
*
2
)
maxThreads
=
2
*
minThreads
;
readPool
.
min
=
2
;
readPool
.
max
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
if
(
readPool
.
max
<=
readPool
.
min
*
2
)
readPool
.
max
=
2
*
readPool
.
min
;
readPool
.
readWorker
=
(
SReadWorker
*
)
calloc
(
sizeof
(
SReadWorker
),
readPool
.
max
);
if
(
readPool
.
readWorker
==
NULL
)
return
-
1
;
for
(
int
i
=
0
;
i
<
readPool
.
max
;
++
i
)
{
SReadWorker
*
pWorker
=
readPool
.
readWorker
+
i
;
pWorker
->
workerId
=
i
;
}
dPrint
(
"dnode read is opened"
);
return
0
;
}
void
dnodeCleanupRead
()
{
for
(
int
i
=
0
;
i
<
readPool
.
max
;
++
i
)
{
SReadWorker
*
pWorker
=
readPool
.
readWorker
+
i
;
if
(
pWorker
->
thread
)
pthread_join
(
pWorker
->
thread
,
NULL
);
}
taosCloseQset
(
readQset
);
dPrint
(
"dnode read is closed"
);
}
...
...
@@ -116,18 +140,25 @@ void *dnodeAllocateRqueue(void *pVnode) {
taosAddIntoQset
(
readQset
,
queue
,
pVnode
);
// spawn a thread to process queue
if
(
threads
<
maxThreads
)
{
pthread_t
thread
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
thread
,
&
thAttr
,
dnodeProcessReadQueue
,
readQset
)
!=
0
)
{
dError
(
"failed to create thread to process read queue, reason:%s"
,
strerror
(
errno
));
}
if
(
readPool
.
num
<
readPool
.
max
)
{
do
{
SReadWorker
*
pWorker
=
readPool
.
readWorker
+
readPool
.
num
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessReadQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process read queue, reason:%s"
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
readPool
.
num
++
;
dTrace
(
"read worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
readPool
.
num
);
}
while
(
readPool
.
num
<
readPool
.
min
);
}
dTrace
(
"pVnode:%p, queue:%p is allocated"
,
pVnode
,
queue
);
dTrace
(
"pVnode:%p,
read
queue:%p is allocated"
,
pVnode
,
queue
);
return
queue
;
}
...
...
@@ -167,14 +198,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
}
static
void
*
dnodeProcessReadQueue
(
void
*
param
)
{
taos_qset
qset
=
(
taos_qset
)
param
;
SReadMsg
*
pReadMsg
;
int
type
;
void
*
pVnode
;
SReadWorker
*
pWorker
=
param
;
SReadMsg
*
pReadMsg
;
int
type
;
void
*
pVnode
;
while
(
1
)
{
if
(
taosReadQitemFromQset
(
qset
,
&
type
,
(
void
**
)
&
pReadMsg
,
(
void
**
)
&
pVnode
)
==
0
)
{
dnodeHandleIdleReadWorker
();
if
(
taosReadQitemFromQset
(
readQset
,
&
type
,
(
void
**
)
&
pReadMsg
,
&
pVnode
)
==
0
)
{
dnodeHandleIdleReadWorker
(
pWorker
);
continue
;
}
...
...
@@ -186,11 +217,12 @@ static void *dnodeProcessReadQueue(void *param) {
return
NULL
;
}
static
void
dnodeHandleIdleReadWorker
()
{
static
void
dnodeHandleIdleReadWorker
(
SReadWorker
*
pWorker
)
{
int32_t
num
=
taosGetQueueNumber
(
readQset
);
if
(
num
==
0
||
(
num
<=
minThreads
&&
threads
>
minThreads
))
{
threads
--
;
if
(
num
==
0
||
(
num
<=
readPool
.
min
&&
readPool
.
num
>
readPool
.
min
))
{
readPool
.
num
--
;
dTrace
(
"read worker:%d is released, total:%d"
,
pWorker
->
workerId
,
readPool
.
num
);
pthread_exit
(
NULL
);
}
else
{
usleep
(
100
);
...
...
src/dnode/src/dnodeWrite.c
浏览文件 @
a9be7a85
...
...
@@ -28,6 +28,7 @@
#include "vnode.h"
typedef
struct
{
taos_qall
qall
;
taos_qset
qset
;
// queue set
pthread_t
thread
;
// thread
int32_t
workerId
;
// worker ID
...
...
@@ -65,6 +66,14 @@ int32_t dnodeInitWrite() {
}
void
dnodeCleanupWrite
()
{
for
(
int32_t
i
=
0
;
i
<
wWorkerPool
.
max
;
++
i
)
{
SWriteWorker
*
pWorker
=
wWorkerPool
.
writeWorker
+
i
;
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
}
}
free
(
wWorkerPool
.
writeWorker
);
dPrint
(
"dnode write is closed"
);
}
...
...
@@ -113,6 +122,7 @@ void *dnodeAllocateWqueue(void *pVnode) {
if
(
pWorker
->
qset
==
NULL
)
return
NULL
;
taosAddIntoQset
(
pWorker
->
qset
,
queue
,
pVnode
);
pWorker
->
qall
=
taosAllocateQall
();
wWorkerPool
.
nextId
=
(
wWorkerPool
.
nextId
+
1
)
%
wWorkerPool
.
max
;
pthread_attr_t
thAttr
;
...
...
@@ -122,13 +132,17 @@ void *dnodeAllocateWqueue(void *pVnode) {
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessWriteQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process read queue, reason:%s"
,
strerror
(
errno
));
taosCloseQset
(
pWorker
->
qset
);
}
else
{
dTrace
(
"write worker:%d is launched"
,
pWorker
->
workerId
);
}
pthread_attr_destroy
(
&
thAttr
);
}
else
{
taosAddIntoQset
(
pWorker
->
qset
,
queue
,
pVnode
);
wWorkerPool
.
nextId
=
(
wWorkerPool
.
nextId
+
1
)
%
wWorkerPool
.
max
;
}
dTrace
(
"pVnode:%p, queue:%p is allocated"
,
pVnode
,
queue
);
dTrace
(
"pVnode:%p,
write
queue:%p is allocated"
,
pVnode
,
queue
);
return
queue
;
}
...
...
@@ -160,17 +174,14 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
static
void
*
dnodeProcessWriteQueue
(
void
*
param
)
{
SWriteWorker
*
pWorker
=
(
SWriteWorker
*
)
param
;
taos_qall
qall
;
SWriteMsg
*
pWrite
;
SWalHead
*
pHead
;
int32_t
numOfMsgs
;
int
type
;
void
*
pVnode
,
*
item
;
qall
=
taosAllocateQall
();
while
(
1
)
{
numOfMsgs
=
taosReadAllQitemsFromQset
(
pWorker
->
qset
,
qall
,
&
pVnode
);
numOfMsgs
=
taosReadAllQitemsFromQset
(
pWorker
->
qset
,
pWorker
->
qall
,
&
pVnode
);
if
(
numOfMsgs
<=
0
)
{
dnodeHandleIdleWorker
(
pWorker
);
// thread exit if no queues anymore
continue
;
...
...
@@ -178,7 +189,7 @@ static void *dnodeProcessWriteQueue(void *param) {
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
pWrite
=
NULL
;
taosGetQitem
(
qall
,
&
type
,
&
item
);
taosGetQitem
(
pWorker
->
qall
,
&
type
,
&
item
);
if
(
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
(
SWriteMsg
*
)
item
;
pHead
=
(
SWalHead
*
)(
pWrite
->
pCont
-
sizeof
(
SWalHead
));
...
...
@@ -196,9 +207,9 @@ static void *dnodeProcessWriteQueue(void *param) {
walFsync
(
vnodeGetWal
(
pVnode
));
// browse all items, and process them one by one
taosResetQitems
(
qall
);
taosResetQitems
(
pWorker
->
qall
);
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
&
type
,
&
item
);
taosGetQitem
(
pWorker
->
qall
,
&
type
,
&
item
);
if
(
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
(
SWriteMsg
*
)
item
;
dnodeSendRpcWriteRsp
(
pVnode
,
item
,
pWrite
->
rpcMsg
.
code
);
...
...
@@ -209,8 +220,6 @@ static void *dnodeProcessWriteQueue(void *param) {
}
}
taosFreeQall
(
qall
);
return
NULL
;
}
...
...
@@ -221,8 +230,10 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
usleep
(
1000
);
sched_yield
();
}
else
{
taosFreeQall
(
pWorker
->
qall
);
taosCloseQset
(
pWorker
->
qset
);
pWorker
->
qset
=
NULL
;
dTrace
(
"write worker:%d is released"
,
pWorker
->
workerId
);
pthread_exit
(
NULL
);
}
}
src/rpc/src/rpcCache.c
浏览文件 @
a9be7a85
...
...
@@ -103,7 +103,8 @@ void rpcCloseConnCache(void *handle) {
if
(
pCache
->
connHashMemPool
)
taosMemPoolCleanUp
(
pCache
->
connHashMemPool
);
tfree
(
pCache
->
connHashList
);
tfree
(
pCache
->
count
)
tfree
(
pCache
->
count
);
tfree
(
pCache
->
lockedBy
);
pthread_mutex_unlock
(
&
pCache
->
mutex
);
...
...
src/rpc/src/rpcClient.c
浏览文件 @
a9be7a85
...
...
@@ -84,7 +84,9 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pTcp
->
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)(
pTcp
))
!=
0
)
{
int
code
=
pthread_create
(
&
(
pTcp
->
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)(
pTcp
));
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
tError
(
"%s failed to create TCP read data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
...
...
src/rpc/src/rpcServer.c
浏览文件 @
a9be7a85
...
...
@@ -83,6 +83,9 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
}
memset
(
pServerObj
->
pThreadObj
,
0
,
sizeof
(
SThreadObj
)
*
(
size_t
)
numOfThreads
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
->
processData
=
fp
;
...
...
@@ -105,8 +108,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
return
NULL
;
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
(
void
*
)
taosProcessTcpData
,
(
void
*
)(
pThreadObj
))
!=
0
)
{
tError
(
"%s failed to create TCP process data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
...
...
@@ -116,8 +117,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
pThreadObj
++
;
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pServerObj
->
thread
),
&
thattr
,
(
void
*
)
taosAcceptTcpConnection
,
(
void
*
)(
pServerObj
))
!=
0
)
{
tError
(
"%s failed to create TCP accept thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
a9be7a85
...
...
@@ -146,10 +146,12 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pConn
->
tmrCtrl
=
pSet
->
tmrCtrl
;
}
if
(
pthread_create
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
)
!=
0
)
{
int
code
=
pthread_create
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
);
if
(
code
!=
0
)
{
tError
(
"%s failed to create thread to process UDP data, reason:%s"
,
label
,
strerror
(
errno
));
taosCloseSocket
(
pConn
->
fd
);
taosCleanUpUdpConnection
(
pSet
);
pthread_attr_destroy
(
&
thAttr
);
return
NULL
;
}
...
...
src/util/src/ihash.c
浏览文件 @
a9be7a85
...
...
@@ -189,7 +189,6 @@ void taosCleanUpIntHash(void *handle) {
free
(
pObj
->
hashList
);
}
memset
(
pObj
,
0
,
sizeof
(
IHashObj
));
free
(
pObj
->
lockedBy
);
free
(
pObj
);
}
...
...
src/util/src/tqueue.c
浏览文件 @
a9be7a85
...
...
@@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue
->
numOfItems
++
;
if
(
queue
->
qset
)
atomic_add_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
pTrace
(
"item:%p is put into queue
, type:%d items:%d"
,
item
,
type
,
queue
->
numOfItems
);
pTrace
(
"item:%p is put into queue
:%p, type:%d items:%d"
,
item
,
queue
,
type
,
queue
->
numOfItems
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
...
...
@@ -297,14 +297,16 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
STaosQnode
*
pNode
=
NULL
;
int
code
=
0
;
pthread_mutex_lock
(
&
qset
->
mutex
);
for
(
int
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
pthread_mutex_lock
(
&
qset
->
mutex
);
//
pthread_mutex_lock(&qset->mutex);
if
(
qset
->
current
==
NULL
)
qset
->
current
=
qset
->
head
;
STaosQueue
*
queue
=
qset
->
current
;
if
(
queue
)
qset
->
current
=
queue
->
next
;
pthread_mutex_unlock
(
&
qset
->
mutex
);
//
pthread_mutex_unlock(&qset->mutex);
if
(
queue
==
NULL
)
break
;
pthread_mutex_lock
(
&
queue
->
mutex
);
...
...
@@ -326,6 +328,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
if
(
pNode
)
break
;
}
pthread_mutex_unlock
(
&
qset
->
mutex
);
return
code
;
}
...
...
@@ -335,13 +339,15 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
STaosQall
*
qall
=
(
STaosQall
*
)
p2
;
int
code
=
0
;
pthread_mutex_lock
(
&
qset
->
mutex
);
for
(
int
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
pthread_mutex_lock
(
&
qset
->
mutex
);
//
pthread_mutex_lock(&qset->mutex);
if
(
qset
->
current
==
NULL
)
qset
->
current
=
qset
->
head
;
queue
=
qset
->
current
;
if
(
queue
)
qset
->
current
=
queue
->
next
;
pthread_mutex_unlock
(
&
qset
->
mutex
);
//
pthread_mutex_unlock(&qset->mutex);
if
(
queue
==
NULL
)
break
;
pthread_mutex_lock
(
&
queue
->
mutex
);
...
...
@@ -365,6 +371,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
if
(
code
!=
0
)
break
;
}
pthread_mutex_unlock
(
&
qset
->
mutex
);
return
code
;
}
...
...
src/util/src/tsched.c
浏览文件 @
a9be7a85
...
...
@@ -94,10 +94,12 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
}
pTrace
(
"%s scheduler is initialized, numOfThreads:%d"
,
pSched
->
label
,
pSched
->
numOfThreads
);
pthread_attr_destroy
(
&
attr
);
return
(
void
*
)
pSched
;
_error:
pthread_attr_destroy
(
&
attr
);
taosCleanUpScheduler
(
pSched
);
return
NULL
;
}
...
...
src/vnode/main/src/vnodeMain.c
浏览文件 @
a9be7a85
...
...
@@ -224,10 +224,11 @@ void vnodeRelease(void *pVnodeRaw) {
// remove the whole directory
}
dTrace
(
"pVnode:%p vgId:%d, vnode is released"
,
pVnode
,
pVnode
->
vgId
);
free
(
pVnode
);
int32_t
count
=
atomic_sub_fetch_32
(
&
tsOpennedVnodes
,
1
);
dTrace
(
"pVnode:%p vgId:%d, vnode is released, vnodes:%d"
,
pVnode
,
vgId
,
count
);
if
(
count
<=
0
)
{
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录