Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8789ae77
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8789ae77
编写于
12月 30, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
set queue to a named type instead of void
上级
60b56c66
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
146 addition
and
173 deletion
+146
-173
include/util/tqueue.h
include/util/tqueue.h
+37
-39
include/util/tworker.h
include/util/tworker.h
+11
-11
source/dnode/mgmt/daemon/src/daemon.c
source/dnode/mgmt/daemon/src/daemon.c
+1
-1
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+2
-2
source/dnode/mgmt/impl/src/dndBnode.c
source/dnode/mgmt/impl/src/dndBnode.c
+3
-3
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+21
-21
source/util/src/tqueue.c
source/util/src/tqueue.c
+65
-90
source/util/src/tworker.c
source/util/src/tworker.c
+6
-6
未找到文件。
include/util/tqueue.h
浏览文件 @
8789ae77
...
...
@@ -22,59 +22,57 @@ extern "C" {
/*
This set of API for queue is designed specially for vnode/mnode. The main purpose is to
consume all the items instead of one item from a queue by one single read. Also, it can
combine multiple queues into a queue set, a consumer thread can consume a queue set via
This set of API for queue is designed specially for vnode/mnode. The main purpose is to
consume all the items instead of one item from a queue by one single read. Also, it can
combine multiple queues into a queue set, a consumer thread can consume a queue set via
a single API instead of looping every queue by itself.
Notes:
1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe
1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe
2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe.
3: read/write operation APIs are multi-thread safe
To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c)
shall be used to set up the protection.
shall be used to set up the protection.
*/
typedef
void
*
taos_q
ueue
;
typedef
void
*
taos_q
set
;
typedef
void
*
taos_q
all
;
typedef
struct
STaosQueue
STaosQ
ueue
;
typedef
struct
STaosQset
STaosQ
set
;
typedef
struct
STaosQall
STaosQ
all
;
typedef
void
(
*
FProcessItem
)(
void
*
ahandle
,
void
*
pItem
);
typedef
void
(
*
FProcessItems
)(
void
*
ahandle
,
taos_qall
qall
,
in
t
numOfItems
);
taos_queue
taosOpenQueue
();
void
taosCloseQueue
(
taos_
queue
);
void
taosSetQueueFp
(
taos_queue
,
FProcessItem
,
FProcessItems
);
void
*
taosAllocateQitem
(
in
t
size
);
void
taosFreeQitem
(
void
*
pItem
);
int
taosWriteQitem
(
taos_
queue
,
void
*
pItem
);
int
taosReadQitem
(
taos_queue
,
void
**
pItem
);
bool
taosQueueEmpty
(
taos_
queue
);
taos_qall
taosAllocateQall
();
void
taosFreeQall
(
taos_
qall
);
int
taosReadAllQitems
(
taos_queue
,
taos_
qall
);
int
taosGetQitem
(
taos_qall
,
void
**
pItem
);
void
taosResetQitems
(
taos_
qall
);
taos_qset
taosOpenQset
();
void
taosCloseQset
();
void
taosQsetThreadResume
(
taos_qset
param
);
int
taosAddIntoQset
(
taos_qset
,
taos_
queue
,
void
*
ahandle
);
void
taosRemoveFromQset
(
taos_qset
,
taos_
queue
);
int
taosGetQueueNumber
(
taos_
qset
);
int
taosReadQitemFromQset
(
taos_qset
,
void
**
pItem
,
void
**
ahandle
,
FProcessItem
*
);
int
taosReadAllQitemsFromQset
(
taos_qset
,
taos_qall
,
void
**
ahandle
,
FProcessItems
*
);
int
taosGetQueueItemsNumber
(
taos_queue
param
);
int
taosGetQsetItemsNumber
(
taos_qset
param
);
typedef
void
(
*
FProcessItems
)(
void
*
ahandle
,
STaosQall
*
qall
,
int32_
t
numOfItems
);
STaosQueue
*
taosOpenQueue
();
void
taosCloseQueue
(
STaosQueue
*
queue
);
void
taosSetQueueFp
(
STaosQueue
*
queue
,
FProcessItem
itemFp
,
FProcessItems
itemsFp
);
void
*
taosAllocateQitem
(
int32_
t
size
);
void
taosFreeQitem
(
void
*
pItem
);
int
32_t
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
);
int
32_t
taosReadQitem
(
STaosQueue
*
queue
,
void
**
p
pItem
);
bool
taosQueueEmpty
(
STaosQueue
*
queue
);
STaosQall
*
taosAllocateQall
();
void
taosFreeQall
(
STaosQall
*
qall
);
int
32_t
taosReadAllQitems
(
STaosQueue
*
queue
,
STaosQall
*
qall
);
int
32_t
taosGetQitem
(
STaosQall
*
qall
,
void
**
p
pItem
);
void
taosResetQitems
(
STaosQall
*
qall
);
STaosQset
*
taosOpenQset
();
void
taosCloseQset
(
STaosQset
*
qset
);
void
taosQsetThreadResume
(
STaosQset
*
qset
);
int
32_t
taosAddIntoQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
,
void
*
ahandle
);
void
taosRemoveFromQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
);
int
32_t
taosGetQueueNumber
(
STaosQset
*
qset
);
int
32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
void
**
ahandle
,
FProcessItem
*
itemFp
);
int
32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
void
**
ahandle
,
FProcessItems
*
itemsFp
);
int
32_t
taosGetQueueItemsNumber
(
STaosQueue
*
queue
);
int
32_t
taosGetQsetItemsNumber
(
STaosQset
*
qset
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_UTIL_QUEUE_H*/
include/util/tworker.h
浏览文件 @
8789ae77
...
...
@@ -35,7 +35,7 @@ typedef struct SWorkerPool {
int32_t
max
;
// max number of workers
int32_t
min
;
// min number of workers
int32_t
num
;
// current number of workers
taos_qset
qset
;
STaosQset
*
qset
;
const
char
*
name
;
SWorker
*
workers
;
pthread_mutex_t
mutex
;
...
...
@@ -44,8 +44,8 @@ typedef struct SWorkerPool {
typedef
struct
SMWorker
{
int32_t
id
;
// worker id
pthread_t
thread
;
// thread
taos_qall
qall
;
taos_qset
qset
;
// queue set
STaosQall
*
qall
;
STaosQset
*
qset
;
// queue set
SMWorkerPool
*
pool
;
}
SMWorker
;
...
...
@@ -57,15 +57,15 @@ typedef struct SMWorkerPool {
pthread_mutex_t
mutex
;
}
SMWorkerPool
;
int32_t
tWorkerInit
(
SWorkerPool
*
pool
);
void
tWorkerCleanup
(
SWorkerPool
*
pool
);
taos_queue
tWorkerAllocQueue
(
SWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItem
fp
);
void
tWorkerFreeQueue
(
SWorkerPool
*
pool
,
taos_queue
queue
);
int32_t
tWorkerInit
(
SWorkerPool
*
pool
);
void
tWorkerCleanup
(
SWorkerPool
*
pool
);
STaosQueue
*
tWorkerAllocQueue
(
SWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItem
fp
);
void
tWorkerFreeQueue
(
SWorkerPool
*
pool
,
STaosQueue
*
queue
);
int32_t
tMWorkerInit
(
SMWorkerPool
*
pool
);
void
tMWorkerCleanup
(
SMWorkerPool
*
pool
);
taos_queue
tMWorkerAllocQueue
(
SMWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItems
fp
);
void
tMWorkerFreeQueue
(
SMWorkerPool
*
pool
,
taos_queue
queue
);
int32_t
tMWorkerInit
(
SMWorkerPool
*
pool
);
void
tMWorkerCleanup
(
SMWorkerPool
*
pool
);
STaosQueue
*
tMWorkerAllocQueue
(
SMWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItems
fp
);
void
tMWorkerFreeQueue
(
SMWorkerPool
*
pool
,
STaosQueue
*
queue
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/daemon/src/daemon.c
浏览文件 @
8789ae77
...
...
@@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption
->
sver
=
30000000
;
//3.0.0.0
pOption
->
numOfCores
=
tsNumOfCores
;
pOption
->
numOfSupportVnodes
=
tsNumOfSupportVnodes
;
pOption
->
numOfCommitThreads
=
1
;
pOption
->
numOfCommitThreads
=
tsNumOfCommitThreads
;
pOption
->
statusInterval
=
tsStatusInterval
;
pOption
->
numOfThreadsPerCore
=
tsNumOfThreadsPerCore
;
pOption
->
ratioOfQueryCores
=
tsRatioOfQueryCores
;
...
...
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
8789ae77
...
...
@@ -64,7 +64,7 @@ typedef struct {
int32_t
maxNum
;
void
*
queueFp
;
SDnode
*
pDnode
;
taos_queue
queue
;
STaosQueue
*
queue
;
union
{
SWorkerPool
pool
;
SMWorkerPool
mpool
;
...
...
@@ -92,7 +92,7 @@ typedef struct {
SDnodeEps
*
dnodeEps
;
pthread_t
*
threadId
;
SRWLatch
latch
;
taos_queue
pMgmtQ
;
STaosQueue
*
pMgmtQ
;
SWorkerPool
mgmtPool
;
}
SDnodeMgmt
;
...
...
source/dnode/mgmt/impl/src/dndBnode.c
浏览文件 @
8789ae77
...
...
@@ -19,7 +19,7 @@
#include "dndTransport.h"
#include "dndWorker.h"
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
taos_qall
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
static
SBnode
*
dndAcquireBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
...
...
@@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
taosFreeQitem
(
pMsg
);
}
static
void
dndSendBnodeErrorRsps
(
taos_qall
qall
,
int32_t
numOfMsgs
,
int32_t
code
)
{
static
void
dndSendBnodeErrorRsps
(
STaosQall
*
qall
,
int32_t
numOfMsgs
,
int32_t
code
)
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
...
...
@@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod
}
}
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
taos_qall
qall
,
int32_t
numOfMsgs
)
{
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SBnode
*
pBnode
=
dndAcquireBnode
(
pDnode
);
if
(
pBnode
==
NULL
)
{
dndSendBnodeErrorRsps
(
qall
,
numOfMsgs
,
TSDB_CODE_OUT_OF_MEMORY
);
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
8789ae77
...
...
@@ -27,20 +27,20 @@ typedef struct {
}
SWrapperCfg
;
typedef
struct
{
int32_t
vgId
;
int32_t
refCount
;
int32_t
vgVersion
;
int8_t
dropped
;
int8_t
accessState
;
uint64_t
dbUid
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
taos_queue
pWriteQ
;
taos_queue
pSyncQ
;
taos_queue
pApplyQ
;
taos_queue
pQueryQ
;
taos_queue
pFetchQ
;
int32_t
vgId
;
int32_t
refCount
;
int32_t
vgVersion
;
int8_t
dropped
;
int8_t
accessState
;
uint64_t
dbUid
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
STaosQueue
*
pWriteQ
;
STaosQueue
*
pSyncQ
;
STaosQueue
*
pApplyQ
;
STaosQueue
*
pQueryQ
;
STaosQueue
*
pFetchQ
;
}
SVnodeObj
;
typedef
struct
{
...
...
@@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
static
void
dndProcessVnodeQueryQueue
(
SVnodeObj
*
pVnode
,
SRpcMsg
*
pMsg
);
static
void
dndProcessVnodeFetchQueue
(
SVnodeObj
*
pVnode
,
SRpcMsg
*
pMsg
);
static
void
dndProcessVnodeWriteQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessVnodeApplyQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessVnodeSyncQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessVnodeWriteQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessVnodeApplyQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
static
void
dndProcessVnodeSyncQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
void
dndProcessVnodeQueryMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessVnodeFetchMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessVnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
...
...
@@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
vnodeProcessFetchReq
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
}
static
void
dndProcessVnodeWriteQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
)
{
static
void
dndProcessVnodeWriteQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
...
...
@@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
taosArrayDestroy
(
pArray
);
}
static
void
dndProcessVnodeApplyQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
)
{
static
void
dndProcessVnodeApplyQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
...
...
@@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
}
}
static
void
dndProcessVnodeSyncQueue
(
SVnodeObj
*
pVnode
,
taos_qall
qall
,
int32_t
numOfMsgs
)
{
static
void
dndProcessVnodeSyncQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
...
...
@@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
}
}
static
int32_t
dndWriteRpcMsgToVnodeQueue
(
taos_queue
pQueue
,
SRpcMsg
*
pRpcMsg
)
{
static
int32_t
dndWriteRpcMsgToVnodeQueue
(
STaosQueue
*
pQueue
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
code
=
0
;
if
(
pQueue
==
NULL
)
{
...
...
source/util/src/tqueue.c
浏览文件 @
8789ae77
...
...
@@ -14,26 +14,29 @@
*/
#include "os.h"
#include "ulog.h"
#include "taoserror.h"
#include "tqueue.h"
#include "ulog.h"
typedef
struct
STaosQnode
STaosQnode
;
typedef
struct
STaosQnode
{
struct
STaosQnode
*
next
;
char
item
[];
STaosQnode
*
next
;
char
item
[];
}
STaosQnode
;
typedef
struct
STaosQueue
{
int32_t
itemSize
;
int32_t
numOfItems
;
struct
STaosQnode
*
head
;
struct
STaosQnode
*
tail
;
struct
STaosQueue
*
next
;
// for queue set
struct
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
FProcessItem
itemFp
;
FProcessItems
itemsFp
;
pthread_mutex_t
mutex
;
int32_t
itemSize
;
int32_t
numOfItems
;
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
FProcessItem
itemFp
;
FProcessItems
itemsFp
;
pthread_mutex_t
mutex
;
}
STaosQueue
;
typedef
struct
STaosQset
{
...
...
@@ -52,8 +55,8 @@ typedef struct STaosQall {
int32_t
numOfItems
;
}
STaosQall
;
taos_queue
taosOpenQueue
()
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
calloc
(
sizeof
(
STaosQueue
),
1
);
STaosQueue
*
taosOpenQueue
()
{
STaosQueue
*
queue
=
calloc
(
sizeof
(
STaosQueue
),
1
);
if
(
queue
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -65,16 +68,14 @@ taos_queue taosOpenQueue() {
return
queue
;
}
void
taosSetQueueFp
(
taos_queue
param
,
FProcessItem
itemFp
,
FProcessItems
itemsFp
)
{
if
(
param
==
NULL
)
return
;
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
void
taosSetQueueFp
(
STaosQueue
*
queue
,
FProcessItem
itemFp
,
FProcessItems
itemsFp
)
{
if
(
queue
==
NULL
)
return
;
queue
->
itemFp
=
itemFp
;
queue
->
itemsFp
=
itemsFp
;
}
void
taosCloseQueue
(
taos_queue
param
)
{
if
(
param
==
NULL
)
return
;
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
void
taosCloseQueue
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
;
STaosQnode
*
pTemp
;
STaosQset
*
qset
;
...
...
@@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) {
uTrace
(
"queue:%p is closed"
,
queue
);
}
bool
taosQueueEmpty
(
taos_queue
param
)
{
if
(
param
==
NULL
)
return
true
;
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
bool
taosQueueEmpty
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
true
;
bool
empty
=
false
;
pthread_mutex_lock
(
&
queue
->
mutex
);
...
...
@@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) {
return
empty
;
}
void
*
taosAllocateQitem
(
int
size
)
{
void
*
taosAllocateQitem
(
int
32_t
size
)
{
STaosQnode
*
pNode
=
(
STaosQnode
*
)
calloc
(
sizeof
(
STaosQnode
)
+
size
,
1
);
if
(
pNode
==
NULL
)
return
NULL
;
...
...
@@ -129,9 +129,8 @@ void taosFreeQitem(void *param) {
free
(
temp
);
}
int
taosWriteQitem
(
taos_queue
param
,
void
*
item
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
STaosQnode
*
pNode
=
(
STaosQnode
*
)(((
char
*
)
item
)
-
sizeof
(
STaosQnode
));
int32_t
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
)
{
STaosQnode
*
pNode
=
(
STaosQnode
*
)(((
char
*
)
pItem
)
-
sizeof
(
STaosQnode
));
pNode
->
next
=
NULL
;
pthread_mutex_lock
(
&
queue
->
mutex
);
...
...
@@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) {
queue
->
numOfItems
++
;
if
(
queue
->
qset
)
atomic_add_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
uTrace
(
"item:%p is put into queue:%p, items:%d"
,
i
tem
,
queue
,
queue
->
numOfItems
);
uTrace
(
"item:%p is put into queue:%p, items:%d"
,
pI
tem
,
queue
,
queue
->
numOfItems
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
...
...
@@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) {
return
0
;
}
int
taosReadQitem
(
taos_queue
param
,
void
**
pitem
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
int32_t
taosReadQitem
(
STaosQueue
*
queue
,
void
**
ppItem
)
{
STaosQnode
*
pNode
=
NULL
;
int
code
=
0
;
int
32_t
code
=
0
;
pthread_mutex_lock
(
&
queue
->
mutex
);
if
(
queue
->
head
)
{
pNode
=
queue
->
head
;
*
p
i
tem
=
pNode
->
item
;
*
p
pI
tem
=
pNode
->
item
;
queue
->
head
=
pNode
->
next
;
if
(
queue
->
head
==
NULL
)
queue
->
tail
=
NULL
;
queue
->
numOfItems
--
;
if
(
queue
->
qset
)
atomic_sub_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
code
=
1
;
uDebug
(
"item:%p is read out from queue:%p, items:%d"
,
*
p
i
tem
,
queue
,
queue
->
numOfItems
);
uDebug
(
"item:%p is read out from queue:%p, items:%d"
,
*
p
pI
tem
,
queue
,
queue
->
numOfItems
);
}
pthread_mutex_unlock
(
&
queue
->
mutex
);
...
...
@@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) {
return
code
;
}
void
*
taosAllocateQall
()
{
void
*
p
=
calloc
(
sizeof
(
STaosQall
),
1
);
return
p
;
}
STaosQall
*
taosAllocateQall
()
{
return
calloc
(
sizeof
(
STaosQall
),
1
);
}
void
taosFreeQall
(
void
*
param
)
{
free
(
param
);
}
void
taosFreeQall
(
STaosQall
*
qall
)
{
free
(
qall
);
}
int
taosReadAllQitems
(
taos_queue
param
,
taos_qall
p2
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
STaosQall
*
qall
=
(
STaosQall
*
)
p2
;
int
code
=
0
;
bool
empty
;
int32_t
taosReadAllQitems
(
STaosQueue
*
queue
,
STaosQall
*
qall
)
{
int32_t
code
=
0
;
bool
empty
;
pthread_mutex_lock
(
&
queue
->
mutex
);
...
...
@@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
return
code
;
}
int
taosGetQitem
(
taos_qall
param
,
void
**
pitem
)
{
STaosQall
*
qall
=
(
STaosQall
*
)
param
;
int32_t
taosGetQitem
(
STaosQall
*
qall
,
void
**
ppItem
)
{
STaosQnode
*
pNode
;
int
num
=
0
;
int
32_t
num
=
0
;
pNode
=
qall
->
current
;
if
(
pNode
)
qall
->
current
=
pNode
->
next
;
if
(
pNode
)
{
*
p
i
tem
=
pNode
->
item
;
*
p
pI
tem
=
pNode
->
item
;
num
=
1
;
uTrace
(
"item:%p is fetched"
,
*
p
i
tem
);
uTrace
(
"item:%p is fetched"
,
*
p
pI
tem
);
}
return
num
;
}
void
taosResetQitems
(
taos_qall
param
)
{
STaosQall
*
qall
=
(
STaosQall
*
)
param
;
qall
->
current
=
qall
->
start
;
}
void
taosResetQitems
(
STaosQall
*
qall
)
{
qall
->
current
=
qall
->
start
;
}
taos_qset
taosOpenQset
()
{
STaosQset
*
taosOpenQset
()
{
STaosQset
*
qset
=
(
STaosQset
*
)
calloc
(
sizeof
(
STaosQset
),
1
);
if
(
qset
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -255,9 +244,8 @@ taos_qset taosOpenQset() {
return
qset
;
}
void
taosCloseQset
(
taos_qset
param
)
{
if
(
param
==
NULL
)
return
;
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
void
taosCloseQset
(
STaosQset
*
qset
)
{
if
(
qset
==
NULL
)
return
;
// remove all the queues from qset
pthread_mutex_lock
(
&
qset
->
mutex
);
...
...
@@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) {
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void
taosQsetThreadResume
(
taos_qset
param
)
{
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
void
taosQsetThreadResume
(
STaosQset
*
qset
)
{
uDebug
(
"qset:%p, it will exit"
,
qset
);
tsem_post
(
&
qset
->
sem
);
}
int
taosAddIntoQset
(
taos_qset
p1
,
taos_queue
p2
,
void
*
ahandle
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
p2
;
STaosQset
*
qset
=
(
STaosQset
*
)
p1
;
int32_t
taosAddIntoQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
,
void
*
ahandle
)
{
if
(
queue
->
qset
)
return
-
1
;
pthread_mutex_lock
(
&
qset
->
mutex
);
...
...
@@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
return
0
;
}
void
taosRemoveFromQset
(
taos_qset
p1
,
taos_queue
p2
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
p2
;
STaosQset
*
qset
=
(
STaosQset
*
)
p1
;
void
taosRemoveFromQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
)
{
STaosQueue
*
tqueue
=
NULL
;
pthread_mutex_lock
(
&
qset
->
mutex
);
...
...
@@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
uTrace
(
"queue:%p is removed from qset:%p"
,
queue
,
qset
);
}
int
taosGetQueueNumber
(
taos_qset
param
)
{
return
((
STaosQset
*
)
param
)
->
numOfQueues
;
}
int
32_t
taosGetQueueNumber
(
STaosQset
*
qset
)
{
return
qset
->
numOfQueues
;
}
int
taosReadQitemFromQset
(
taos_qset
param
,
void
**
pitem
,
void
**
ahandle
,
FProcessItem
*
itemFp
)
{
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
void
**
ahandle
,
FProcessItem
*
itemFp
)
{
STaosQnode
*
pNode
=
NULL
;
int
code
=
0
;
int
32_t
code
=
0
;
tsem_wait
(
&
qset
->
sem
);
pthread_mutex_lock
(
&
qset
->
mutex
);
for
(
int
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
if
(
qset
->
current
==
NULL
)
qset
->
current
=
qset
->
head
;
STaosQueue
*
queue
=
qset
->
current
;
if
(
queue
)
qset
->
current
=
queue
->
next
;
...
...
@@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
if
(
queue
->
head
)
{
pNode
=
queue
->
head
;
*
p
i
tem
=
pNode
->
item
;
*
p
pI
tem
=
pNode
->
item
;
if
(
ahandle
)
*
ahandle
=
queue
->
ahandle
;
if
(
itemFp
)
*
itemFp
=
queue
->
itemFp
;
queue
->
head
=
pNode
->
next
;
...
...
@@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
queue
->
numOfItems
--
;
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
1
);
code
=
1
;
uTrace
(
"item:%p is read out from queue:%p, items:%d"
,
*
p
i
tem
,
queue
,
queue
->
numOfItems
);
uTrace
(
"item:%p is read out from queue:%p, items:%d"
,
*
p
pI
tem
,
queue
,
queue
->
numOfItems
);
}
pthread_mutex_unlock
(
&
queue
->
mutex
);
...
...
@@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
return
code
;
}
int
taosReadAllQitemsFromQset
(
taos_qset
param
,
taos_qall
p2
,
void
**
ahandle
,
FProcessItems
*
itemsFp
)
{
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
void
**
ahandle
,
FProcessItems
*
itemsFp
)
{
STaosQueue
*
queue
;
STaosQall
*
qall
=
(
STaosQall
*
)
p2
;
int
code
=
0
;
int32_t
code
=
0
;
tsem_wait
(
&
qset
->
sem
);
pthread_mutex_lock
(
&
qset
->
mutex
);
for
(
int
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
if
(
qset
->
current
==
NULL
)
qset
->
current
=
qset
->
head
;
for
(
int32_t
i
=
0
;
i
<
qset
->
numOfQueues
;
++
i
)
{
if
(
qset
->
current
==
NULL
)
qset
->
current
=
qset
->
head
;
queue
=
qset
->
current
;
if
(
queue
)
qset
->
current
=
queue
->
next
;
if
(
queue
==
NULL
)
break
;
...
...
@@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr
queue
->
tail
=
NULL
;
queue
->
numOfItems
=
0
;
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
qall
->
numOfItems
);
for
(
int
j
=
1
;
j
<
qall
->
numOfItems
;
++
j
)
tsem_wait
(
&
qset
->
sem
);
}
for
(
int
32_t
j
=
1
;
j
<
qall
->
numOfItems
;
++
j
)
tsem_wait
(
&
qset
->
sem
);
}
pthread_mutex_unlock
(
&
queue
->
mutex
);
if
(
code
!=
0
)
break
;
if
(
code
!=
0
)
break
;
}
pthread_mutex_unlock
(
&
qset
->
mutex
);
return
code
;
}
int
taosGetQueueItemsNumber
(
taos_queue
param
)
{
STaosQueue
*
queue
=
(
STaosQueue
*
)
param
;
int32_t
taosGetQueueItemsNumber
(
STaosQueue
*
queue
)
{
if
(
!
queue
)
return
0
;
int
num
;
int
32_t
num
;
pthread_mutex_lock
(
&
queue
->
mutex
);
num
=
queue
->
numOfItems
;
pthread_mutex_unlock
(
&
queue
->
mutex
);
return
num
;
}
int
taosGetQsetItemsNumber
(
taos_qset
param
)
{
STaosQset
*
qset
=
(
STaosQset
*
)
param
;
int32_t
taosGetQsetItemsNumber
(
STaosQset
*
qset
)
{
if
(
!
qset
)
return
0
;
int
num
=
0
;
int
32_t
num
=
0
;
pthread_mutex_lock
(
&
qset
->
mutex
);
num
=
qset
->
numOfItems
;
pthread_mutex_unlock
(
&
qset
->
mutex
);
...
...
source/util/src/tworker.c
浏览文件 @
8789ae77
...
...
@@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) {
return
NULL
;
}
taos_queue
tWorkerAllocQueue
(
SWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItem
fp
)
{
STaosQueue
*
tWorkerAllocQueue
(
SWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItem
fp
)
{
pthread_mutex_lock
(
&
pool
->
mutex
);
taos_queue
queue
=
taosOpenQueue
();
STaosQueue
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
{
pthread_mutex_unlock
(
&
pool
->
mutex
);
return
NULL
;
...
...
@@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
return
queue
;
}
void
tWorkerFreeQueue
(
SWorkerPool
*
pool
,
void
*
queue
)
{
void
tWorkerFreeQueue
(
SWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
taosCloseQueue
(
queue
);
uDebug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
}
...
...
@@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
return
NULL
;
}
taos_queue
tMWorkerAllocQueue
(
SMWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItems
fp
)
{
STaosQueue
*
tMWorkerAllocQueue
(
SMWorkerPool
*
pool
,
void
*
ahandle
,
FProcessItems
fp
)
{
pthread_mutex_lock
(
&
pool
->
mutex
);
SMWorker
*
worker
=
pool
->
workers
+
pool
->
nextId
;
taos_q
ueue
*
queue
=
taosOpenQueue
();
STaosQ
ueue
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
{
pthread_mutex_unlock
(
&
pool
->
mutex
);
return
NULL
;
...
...
@@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f
return
queue
;
}
void
tMWorkerFreeQueue
(
SMWorkerPool
*
pool
,
taos_queue
queue
)
{
void
tMWorkerFreeQueue
(
SMWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
taosCloseQueue
(
queue
);
uDebug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录