Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7680fd0a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7680fd0a
编写于
12月 28, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19210 from taosdata/enh/TD-21585
enh: adjusting the operation mode of the stream thread pool
上级
72425ebd
367b6512
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
187 addition
and
46 deletion
+187
-46
docs/en/14-reference/12-config/index.md
docs/en/14-reference/12-config/index.md
+1
-1
docs/zh/14-reference/12-config/index.md
docs/zh/14-reference/12-config/index.md
+1
-1
include/common/tglobal.h
include/common/tglobal.h
+1
-1
include/util/tworker.h
include/util/tworker.h
+18
-4
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-9
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+14
-14
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+6
-7
source/util/src/tworker.c
source/util/src/tworker.c
+141
-9
未找到文件。
docs/en/14-reference/12-config/index.md
浏览文件 @
7680fd0a
...
@@ -733,7 +733,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
...
@@ -733,7 +733,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
| 42 | numOfCommitThreads | Yes | Yes |
| 42 | numOfCommitThreads | Yes | Yes |
| 43 | numOfMnodeReadThreads | No | Yes |
| 43 | numOfMnodeReadThreads | No | Yes |
| 44 | numOfVnodeQueryThreads | No | Yes |
| 44 | numOfVnodeQueryThreads | No | Yes |
| 45 |
num
OfVnodeStreamThreads | No | Yes |
| 45 |
ratio
OfVnodeStreamThreads | No | Yes |
| 46 | numOfVnodeFetchThreads | No | Yes |
| 46 | numOfVnodeFetchThreads | No | Yes |
| 47 | numOfVnodeRsmaThreads | No | Yes |
| 47 | numOfVnodeRsmaThreads | No | Yes |
| 48 | numOfQnodeQueryThreads | No | Yes |
| 48 | numOfQnodeQueryThreads | No | Yes |
...
...
docs/zh/14-reference/12-config/index.md
浏览文件 @
7680fd0a
...
@@ -709,7 +709,7 @@ charset 的有效值是 UTF-8。
...
@@ -709,7 +709,7 @@ charset 的有效值是 UTF-8。
| 42 | numOfCommitThreads | 是 | 是 | |
| 42 | numOfCommitThreads | 是 | 是 | |
| 43 | numOfMnodeReadThreads | 否 | 是 | |
| 43 | numOfMnodeReadThreads | 否 | 是 | |
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
| 45 |
numOfVnodeStreamThreads | 否
| 是 | |
| 45 |
ratioOfVnodeStreamThreads | 否
| 是 | |
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
...
...
include/common/tglobal.h
浏览文件 @
7680fd0a
...
@@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
...
@@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
extern
int32_t
tsNumOfMnodeFetchThreads
;
extern
int32_t
tsNumOfMnodeFetchThreads
;
extern
int32_t
tsNumOfMnodeReadThreads
;
extern
int32_t
tsNumOfMnodeReadThreads
;
extern
int32_t
tsNumOfVnodeQueryThreads
;
extern
int32_t
tsNumOfVnodeQueryThreads
;
extern
int32_t
tsNum
OfVnodeStreamThreads
;
extern
float
tsRatio
OfVnodeStreamThreads
;
extern
int32_t
tsNumOfVnodeFetchThreads
;
extern
int32_t
tsNumOfVnodeFetchThreads
;
extern
int32_t
tsNumOfVnodeRsmaThreads
;
extern
int32_t
tsNumOfVnodeRsmaThreads
;
extern
int32_t
tsNumOfQnodeQueryThreads
;
extern
int32_t
tsNumOfQnodeQueryThreads
;
...
...
include/util/tworker.h
浏览文件 @
7680fd0a
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#define _TD_UTIL_WORKER_H_
#define _TD_UTIL_WORKER_H_
#include "tqueue.h"
#include "tqueue.h"
#include "tarray.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -29,7 +30,7 @@ typedef struct SQWorker {
...
@@ -29,7 +30,7 @@ typedef struct SQWorker {
int32_t
id
;
// worker id
int32_t
id
;
// worker id
int64_t
pid
;
// thread pid
int64_t
pid
;
// thread pid
TdThread
thread
;
// thread id
TdThread
thread
;
// thread id
SQWorkerPool
*
pool
;
void
*
pool
;
}
SQWorker
;
}
SQWorker
;
typedef
struct
SQWorkerPool
{
typedef
struct
SQWorkerPool
{
...
@@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
...
@@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
}
SQWorkerPool
;
}
SQWorkerPool
;
typedef
struct
SAutoQWorkerPool
{
float
ratio
;
STaosQset
*
qset
;
const
char
*
name
;
SArray
*
workers
;
TdThreadMutex
mutex
;
}
SAutoQWorkerPool
;
typedef
struct
SWWorker
{
typedef
struct
SWWorker
{
int32_t
id
;
// worker id
int32_t
id
;
// worker id
int64_t
pid
;
// thread pid
int64_t
pid
;
// thread pid
...
@@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
...
@@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue
*
tQWorkerAllocQueue
(
SQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
STaosQueue
*
tQWorkerAllocQueue
(
SQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
);
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
);
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
);
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
);
STaosQueue
*
tAutoQWorkerAllocQueue
(
SAutoQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
void
tAutoQWorkerFreeQueue
(
SAutoQWorkerPool
*
pool
,
STaosQueue
*
queue
);
int32_t
tWWorkerInit
(
SWWorkerPool
*
pool
);
int32_t
tWWorkerInit
(
SWWorkerPool
*
pool
);
void
tWWorkerCleanup
(
SWWorkerPool
*
pool
);
void
tWWorkerCleanup
(
SWWorkerPool
*
pool
);
STaosQueue
*
tWWorkerAllocQueue
(
SWWorkerPool
*
pool
,
void
*
ahandle
,
FItems
fp
);
STaosQueue
*
tWWorkerAllocQueue
(
SWWorkerPool
*
pool
,
void
*
ahandle
,
FItems
fp
);
...
...
source/common/src/tglobal.c
浏览文件 @
7680fd0a
...
@@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
...
@@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t
tsNumOfMnodeFetchThreads
=
1
;
int32_t
tsNumOfMnodeFetchThreads
=
1
;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeStreamThreads
=
2
;
float
tsRatioOfVnodeStreamThreads
=
1
.
0
;
int32_t
tsNumOfVnodeFetchThreads
=
4
;
int32_t
tsNumOfVnodeFetchThreads
=
4
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
int32_t
tsNumOfQnodeQueryThreads
=
4
;
int32_t
tsNumOfQnodeQueryThreads
=
4
;
...
@@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
4
);
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeStreamThreads
=
tsNumOfCores
/
4
;
if
(
cfgAddFloat
(
pCfg
,
"ratioOfVnodeStreamThreads"
,
tsRatioOfVnodeStreamThreads
,
0
.
01
,
100
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
...
@@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
...
@@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
pItem
=
cfgGetItem
(
tsCfg
,
"
num
OfVnodeStreamThreads"
);
pItem
=
cfgGetItem
(
tsCfg
,
"
ratio
OfVnodeStreamThreads"
);
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
tsNumOfVnodeStreamThreads
=
numOfCores
/
4
;
pItem
->
fval
=
tsRatioOfVnodeStreamThreads
;
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
pItem
->
i32
=
tsNumOfVnodeStreamThreads
;
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
...
@@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
...
@@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads
=
cfgGetItem
(
pCfg
,
"numOfCommitThreads"
)
->
i32
;
tsNumOfCommitThreads
=
cfgGetItem
(
pCfg
,
"numOfCommitThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
ts
NumOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeStreamThreads"
)
->
i32
;
ts
RatioOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"ratioOfVnodeStreamThreads"
)
->
fval
;
tsNumOfVnodeFetchThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeFetchThreads"
)
->
i32
;
tsNumOfVnodeFetchThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeFetchThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
7680fd0a
...
@@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
...
@@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
const
char
*
path
;
const
char
*
path
;
const
char
*
name
;
const
char
*
name
;
SQWorkerPool
queryPool
;
SQWorkerPool
queryPool
;
S
QWorkerPool
streamPool
;
S
AutoQWorkerPool
streamPool
;
SWWorkerPool
fetchPool
;
SWWorkerPool
fetchPool
;
SSingleWorker
mgmtWorker
;
SSingleWorker
mgmtWorker
;
SHashObj
*
hash
;
SHashObj
*
hash
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
7680fd0a
...
@@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
(
void
)
tMultiWorkerInit
(
&
pVnode
->
pApplyW
,
&
acfg
);
(
void
)
tMultiWorkerInit
(
&
pVnode
->
pApplyW
,
&
acfg
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pStreamQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
pVnode
->
pStreamQ
=
t
Auto
QWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
pVnode
->
pFetchQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItems
)
vmProcessFetchQueue
);
pVnode
->
pFetchQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItems
)
vmProcessFetchQueue
);
if
(
pVnode
->
pWriteW
.
queue
==
NULL
||
pVnode
->
pSyncW
.
queue
==
NULL
||
pVnode
->
pSyncCtrlW
.
queue
==
NULL
||
if
(
pVnode
->
pWriteW
.
queue
==
NULL
||
pVnode
->
pSyncW
.
queue
==
NULL
||
pVnode
->
pSyncCtrlW
.
queue
==
NULL
||
...
@@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void
vmFreeQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
void
vmFreeQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
streamPool
,
pVnode
->
pStreamQ
);
t
Auto
QWorkerFreeQueue
(
&
pMgmt
->
streamPool
,
pVnode
->
pStreamQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pStreamQ
=
NULL
;
pVnode
->
pStreamQ
=
NULL
;
...
@@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
SQWorkerPool
*
pStreamPool
=
&
pMgmt
->
streamPool
;
S
Auto
QWorkerPool
*
pStreamPool
=
&
pMgmt
->
streamPool
;
pStreamPool
->
name
=
"vnode-stream"
;
pStreamPool
->
name
=
"vnode-stream"
;
pStreamPool
->
min
=
tsNumOfVnodeStreamThreads
;
pStreamPool
->
ratio
=
tsRatioOfVnodeStreamThreads
;
pStreamPool
->
max
=
tsNumOfVnodeStreamThreads
;
if
(
tAutoQWorkerInit
(
pStreamPool
)
!=
0
)
return
-
1
;
if
(
tQWorkerInit
(
pStreamPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
SWWorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
pFPool
->
name
=
"vnode-fetch"
;
pFPool
->
name
=
"vnode-fetch"
;
...
@@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void
vmStopWorker
(
SVnodeMgmt
*
pMgmt
)
{
void
vmStopWorker
(
SVnodeMgmt
*
pMgmt
)
{
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
streamPool
);
t
Auto
QWorkerCleanup
(
&
pMgmt
->
streamPool
);
tWWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tWWorkerCleanup
(
&
pMgmt
->
fetchPool
);
dDebug
(
"vnode workers are closed"
);
dDebug
(
"vnode workers are closed"
);
}
}
source/util/src/tworker.c
浏览文件 @
7680fd0a
...
@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
...
@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker
->
pool
=
pool
;
worker
->
pool
=
pool
;
}
}
u
Debug
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
u
Info
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
return
0
;
return
0
;
}
}
...
@@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SQWorker
*
worker
=
pool
->
workers
+
i
;
SQWorker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
taosThreadClear
(
&
worker
->
thread
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
}
}
}
...
@@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
taosCloseQset
(
pool
->
qset
);
taosCloseQset
(
pool
->
qset
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
u
Debug
(
"worker:%s is closed"
,
pool
->
name
);
u
Info
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
...
@@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
...
@@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
taosThreadAttrDestroy
(
&
thAttr
);
taosThreadAttrDestroy
(
&
thAttr
);
pool
->
num
++
;
pool
->
num
++
;
u
Debug
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
num
);
u
Info
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
num
);
}
while
(
pool
->
num
<
pool
->
min
);
}
while
(
pool
->
num
<
pool
->
min
);
}
}
...
@@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
...
@@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
}
}
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
uDebug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
uInfo
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
}
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
)
{
pool
->
qset
=
taosOpenQset
();
pool
->
workers
=
taosArrayInit
(
2
,
sizeof
(
SQWorker
*
));
if
(
pool
->
workers
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
(
void
)
taosThreadMutexInit
(
&
pool
->
mutex
,
NULL
);
uInfo
(
"worker:%s is initialized as auto"
,
pool
->
name
);
return
0
;
}
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
)
{
int32_t
size
=
taosArrayGetSize
(
pool
->
workers
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
taosQsetThreadResume
(
pool
->
qset
);
}
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
taosMemoryFree
(
worker
);
}
taosArrayDestroy
(
pool
->
workers
);
taosCloseQset
(
pool
->
qset
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
}
static
void
*
tAutoQWorkerThreadFp
(
SQWorker
*
worker
)
{
SAutoQWorkerPool
*
pool
=
worker
->
pool
;
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
int32_t
code
=
0
;
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
worker
->
pid
=
taosGetSelfPthreadId
();
uInfo
(
"worker:%s:%d is running, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
worker
->
pid
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pool
->
qset
,
(
void
**
)
&
msg
,
&
qinfo
)
==
0
)
{
uInfo
(
"worker:%s:%d qset:%p, got no message and exiting, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
pool
->
qset
,
worker
->
pid
);
break
;
}
if
(
qinfo
.
fp
!=
NULL
)
{
qinfo
.
workerId
=
worker
->
id
;
qinfo
.
threadNum
=
taosArrayGetSize
(
pool
->
workers
);
(
*
((
FItem
)
qinfo
.
fp
))(
&
qinfo
,
msg
);
}
taosUpdateItemSize
(
qinfo
.
queue
,
1
);
}
return
NULL
;
}
STaosQueue
*
tAutoQWorkerAllocQueue
(
SAutoQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
)
{
STaosQueue
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
return
NULL
;
taosThreadMutexLock
(
&
pool
->
mutex
);
taosSetQueueFp
(
queue
,
fp
,
NULL
);
taosAddIntoQset
(
pool
->
qset
,
queue
,
ahandle
);
int32_t
queueNum
=
taosGetQueueNumber
(
pool
->
qset
);
int32_t
curWorkerNum
=
taosArrayGetSize
(
pool
->
workers
);
int32_t
dstWorkerNum
=
ceil
(
queueNum
*
pool
->
ratio
);
if
(
dstWorkerNum
<
1
)
dstWorkerNum
=
1
;
// spawn a thread to process queue
while
(
curWorkerNum
<
dstWorkerNum
)
{
SQWorker
*
worker
=
taosMemoryCalloc
(
1
,
sizeof
(
SQWorker
));
if
(
worker
==
NULL
||
taosArrayPush
(
pool
->
workers
,
&
worker
)
==
NULL
)
{
uError
(
"worker:%s:%d failed to create"
,
pool
->
name
,
curWorkerNum
);
taosMemoryFree
(
worker
);
taosCloseQueue
(
queue
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
worker
->
id
=
curWorkerNum
;
worker
->
pool
=
pool
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tAutoQWorkerThreadFp
,
worker
)
!=
0
)
{
uError
(
"worker:%s:%d failed to create thread, total:%d"
,
pool
->
name
,
worker
->
id
,
curWorkerNum
);
(
void
)
taosArrayPop
(
pool
->
workers
);
taosMemoryFree
(
worker
);
taosCloseQueue
(
queue
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosThreadAttrDestroy
(
&
thAttr
);
uInfo
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
(
int32_t
)
taosArrayGetSize
(
pool
->
workers
));
curWorkerNum
++
;
}
taosThreadMutexUnlock
(
&
pool
->
mutex
);
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p"
,
pool
->
name
,
queue
,
ahandle
);
return
queue
;
}
void
tAutoQWorkerFreeQueue
(
SAutoQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
uInfo
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
taosCloseQueue
(
queue
);
}
}
...
@@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
...
@@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
worker
->
pool
=
pool
;
worker
->
pool
=
pool
;
}
}
u
Debug
(
"worker:%s is initialized, max:%d"
,
pool
->
name
,
pool
->
max
);
u
Info
(
"worker:%s is initialized, max:%d"
,
pool
->
name
,
pool
->
max
);
return
0
;
return
0
;
}
}
...
@@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
...
@@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SWWorker
*
worker
=
pool
->
workers
+
i
;
SWWorker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
taosThreadClear
(
&
worker
->
thread
);
taosFreeQall
(
worker
->
qall
);
taosFreeQall
(
worker
->
qall
);
taosCloseQset
(
worker
->
qset
);
taosCloseQset
(
worker
->
qset
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
}
}
}
taosMemoryFreeClear
(
pool
->
workers
);
taosMemoryFreeClear
(
pool
->
workers
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
u
Debug
(
"worker:%s is closed"
,
pool
->
name
);
u
Info
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
...
@@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
...
@@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tWWorkerThreadFp
,
worker
)
!=
0
)
goto
_OVER
;
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tWWorkerThreadFp
,
worker
)
!=
0
)
goto
_OVER
;
u
Debug
(
"worker:%s:%d is launched, max:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
max
);
u
Info
(
"worker:%s:%d is launched, max:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
max
);
pool
->
nextId
=
(
pool
->
nextId
+
1
)
%
pool
->
max
;
pool
->
nextId
=
(
pool
->
nextId
+
1
)
%
pool
->
max
;
taosThreadAttrDestroy
(
&
thAttr
);
taosThreadAttrDestroy
(
&
thAttr
);
...
@@ -259,13 +390,14 @@ _OVER:
...
@@ -259,13 +390,14 @@ _OVER:
}
else
{
}
else
{
while
(
worker
->
pid
<=
0
)
taosMsleep
(
10
);
while
(
worker
->
pid
<=
0
)
taosMsleep
(
10
);
queue
->
threadId
=
worker
->
pid
;
queue
->
threadId
=
worker
->
pid
;
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p thread:%08"
PRId64
,
pool
->
name
,
queue
,
ahandle
,
queue
->
threadId
);
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p thread:%08"
PRId64
,
pool
->
name
,
queue
,
ahandle
,
queue
->
threadId
);
return
queue
;
return
queue
;
}
}
}
}
void
tWWorkerFreeQueue
(
SWWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
void
tWWorkerFreeQueue
(
SWWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
u
Debug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
u
Info
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
taosCloseQueue
(
queue
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录