Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a0983cf8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a0983cf8
编写于
11月 04, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: adjust tqueue and tworker log
上级
a2692a3d
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
77 addition
and
125 deletion
+77
-125
include/util/tqueue.h
include/util/tqueue.h
+41
-0
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+16
-4
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+11
-6
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+0
-69
source/util/src/tqueue.c
source/util/src/tqueue.c
+0
-40
source/util/src/tworker.c
source/util/src/tworker.c
+9
-6
未找到文件。
include/util/tqueue.h
浏览文件 @
a0983cf8
...
...
@@ -59,6 +59,47 @@ typedef enum {
typedef
void
(
*
FItem
)(
SQueueInfo
*
pInfo
,
void
*
pItem
);
typedef
void
(
*
FItems
)(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfItems
);
typedef
struct
STaosQnode
STaosQnode
;
typedef
struct
STaosQnode
{
STaosQnode
*
next
;
STaosQueue
*
queue
;
int64_t
timestamp
;
int32_t
size
;
int8_t
itype
;
int8_t
reserved
[
3
];
char
item
[];
}
STaosQnode
;
typedef
struct
STaosQueue
{
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
FItem
itemFp
;
FItems
itemsFp
;
TdThreadMutex
mutex
;
int64_t
memOfItems
;
int32_t
numOfItems
;
int64_t
threadId
;
}
STaosQueue
;
typedef
struct
STaosQset
{
STaosQueue
*
head
;
STaosQueue
*
current
;
TdThreadMutex
mutex
;
tsem_t
sem
;
int32_t
numOfQueues
;
int32_t
numOfItems
;
}
STaosQset
;
typedef
struct
STaosQall
{
STaosQnode
*
current
;
STaosQnode
*
start
;
int32_t
numOfItems
;
}
STaosQall
;
STaosQueue
*
taosOpenQueue
();
void
taosCloseQueue
(
STaosQueue
*
queue
);
void
taosSetQueueFp
(
STaosQueue
*
queue
,
FItem
itemFp
,
FItems
itemsFp
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
a0983cf8
...
...
@@ -86,22 +86,34 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
d
Trace
(
"vgId:%d, wait for vnode ref become 0"
,
pVnode
->
vgId
);
d
Info
(
"vgId:%d, wait for vnode ref become 0"
,
pVnode
->
vgId
);
while
(
pVnode
->
refCount
>
0
)
taosMsleep
(
10
);
dTrace
(
"vgId:%d, wait for vnode queue is empty"
,
pVnode
->
vgId
);
dInfo
(
"vgId:%d, wait for vnode write queue:%p is empty, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pWriteQ
,
pVnode
->
pWriteQ
->
threadId
);
while
(
!
taosQueueEmpty
(
pVnode
->
pWriteQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode sync queue:%p is empty, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pSyncQ
,
pVnode
->
pWriteQ
->
threadId
);
while
(
!
taosQueueEmpty
(
pVnode
->
pSyncQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pSyncCtrlQ
,
pVnode
->
pWriteQ
->
threadId
);
while
(
!
taosQueueEmpty
(
pVnode
->
pSyncCtrlQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode apply queue:%p is empty, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pApplyQ
,
pVnode
->
pWriteQ
->
threadId
);
while
(
!
taosQueueEmpty
(
pVnode
->
pApplyQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode query queue:%p is empty"
,
pVnode
->
vgId
,
pVnode
->
pQueryQ
);
while
(
!
taosQueueEmpty
(
pVnode
->
pQueryQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pFetchQ
,
pVnode
->
pWriteQ
->
threadId
);
while
(
!
taosQueueEmpty
(
pVnode
->
pFetchQ
))
taosMsleep
(
10
);
dInfo
(
"vgId:%d, wait for vnode stream queue:%p is empty"
,
pVnode
->
vgId
,
pVnode
->
pStreamQ
);
while
(
!
taosQueueEmpty
(
pVnode
->
pStreamQ
))
taosMsleep
(
10
);
d
Trace
(
"vgId:%d, vnode queue
is empty"
,
pVnode
->
vgId
);
d
Info
(
"vgId:%d, all vnode queues
is empty"
,
pVnode
->
vgId
);
vmFreeQueue
(
pMgmt
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
pVnode
->
pImpl
=
NULL
;
d
Debug
(
"vgId:%d, vnode is closed"
,
pVnode
->
vgId
);
d
Info
(
"vgId:%d, vnode is closed"
,
pVnode
->
vgId
);
if
(
pVnode
->
dropped
)
{
dInfo
(
"vgId:%d, vnode is destroyed, dropped:%d"
,
pVnode
->
vgId
,
pVnode
->
dropped
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
a0983cf8
...
...
@@ -320,12 +320,17 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return
-
1
;
}
dDebug
(
"vgId:%d, write-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pWriteQ
);
dDebug
(
"vgId:%d, sync-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pSyncQ
);
dDebug
(
"vgId:%d, apply-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pApplyQ
);
dDebug
(
"vgId:%d, query-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pQueryQ
);
dDebug
(
"vgId:%d, stream-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pStreamQ
);
dDebug
(
"vgId:%d, fetch-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pFetchQ
);
dInfo
(
"vgId:%d, write-queue:%p is alloced, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pWriteQ
,
pVnode
->
pWriteQ
->
threadId
);
dInfo
(
"vgId:%d, sync-queue:%p is alloced, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pSyncQ
,
pVnode
->
pSyncQ
->
threadId
);
dInfo
(
"vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pSyncCtrlQ
,
pVnode
->
pSyncCtrlQ
->
threadId
);
dInfo
(
"vgId:%d, apply-queue:%p is alloced, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pApplyQ
,
pVnode
->
pApplyQ
->
threadId
);
dInfo
(
"vgId:%d, query-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pQueryQ
);
dInfo
(
"vgId:%d, fetch-queue:%p is alloced, thread:%08"
PRId64
,
pVnode
->
vgId
,
pVnode
->
pFetchQ
,
pVnode
->
pFetchQ
->
threadId
);
dInfo
(
"vgId:%d, stream-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pStreamQ
);
return
0
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
a0983cf8
...
...
@@ -244,16 +244,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
static
int32_t
vnodeSyncEqCtrlMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
||
pMsg
->
pCont
==
NULL
)
{
return
-
1
;
}
if
(
msgcb
==
NULL
||
msgcb
->
putToQueueFp
==
NULL
)
{
rpcFreeCont
(
pMsg
->
pCont
);
pMsg
->
pCont
=
NULL
;
return
-
1
;
}
int32_t
code
=
tmsgPutToQueue
(
msgcb
,
SYNC_CTRL_QUEUE
,
pMsg
);
if
(
code
!=
0
)
{
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -263,16 +253,6 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
}
static
int32_t
vnodeSyncEqMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
||
pMsg
->
pCont
==
NULL
)
{
return
-
1
;
}
if
(
msgcb
==
NULL
||
msgcb
->
putToQueueFp
==
NULL
)
{
rpcFreeCont
(
pMsg
->
pCont
);
pMsg
->
pCont
=
NULL
;
return
-
1
;
}
int32_t
code
=
tmsgPutToQueue
(
msgcb
,
SYNC_QUEUE
,
pMsg
);
if
(
code
!=
0
)
{
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -342,52 +322,26 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, cons
TMSG_INFO
(
pMsg
->
msgType
));
}
#define USE_TSDB_SNAPSHOT
static
int32_t
vnodeSnapshotStartRead
(
const
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
SSnapshotParam
*
pSnapshotParam
=
pParam
;
int32_t
code
=
vnodeSnapReaderOpen
(
pVnode
,
pSnapshotParam
->
start
,
pSnapshotParam
->
end
,
(
SVSnapReader
**
)
ppReader
);
return
code
;
#else
*
ppReader
=
taosMemoryMalloc
(
32
);
return
0
;
#endif
}
static
int32_t
vnodeSnapshotStopRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
int32_t
code
=
vnodeSnapReaderClose
(
pReader
);
return
code
;
#else
taosMemoryFree
(
pReader
);
return
0
;
#endif
}
static
int32_t
vnodeSnapshotDoRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
int32_t
code
=
vnodeSnapRead
(
pReader
,
(
uint8_t
**
)
ppBuf
,
len
);
return
code
;
#else
static
int32_t
times
=
0
;
if
(
times
++
<
5
)
{
*
len
=
64
;
*
ppBuf
=
taosMemoryMalloc
(
*
len
);
snprintf
(
*
ppBuf
,
*
len
,
"snapshot block %d"
,
times
);
}
else
{
*
len
=
0
;
*
ppBuf
=
NULL
;
}
return
0
;
#endif
}
static
int32_t
vnodeSnapshotStartWrite
(
const
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppWriter
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
SSnapshotParam
*
pSnapshotParam
=
pParam
;
...
...
@@ -404,14 +358,9 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void
int32_t
code
=
vnodeSnapWriterOpen
(
pVnode
,
pSnapshotParam
->
start
,
pSnapshotParam
->
end
,
(
SVSnapWriter
**
)
ppWriter
);
return
code
;
#else
*
ppWriter
=
taosMemoryMalloc
(
32
);
return
0
;
#endif
}
static
int32_t
vnodeSnapshotStopWrite
(
const
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
,
SSnapshot
*
pSnapshot
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
vInfo
(
"vgId:%d, stop write vnode snapshot, apply:%d, index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
,
pVnode
->
config
.
vgId
,
isApply
,
pSnapshot
->
lastApplyIndex
,
pSnapshot
->
lastApplyTerm
,
pSnapshot
->
lastConfigIndex
);
...
...
@@ -419,22 +368,14 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
int32_t
code
=
vnodeSnapWriterClose
(
pWriter
,
!
isApply
,
pSnapshot
);
vInfo
(
"vgId:%d, apply vnode snapshot finished, code:0x%x"
,
pVnode
->
config
.
vgId
,
code
);
return
code
;
#else
taosMemoryFree
(
pWriter
);
return
0
;
#endif
}
static
int32_t
vnodeSnapshotDoWrite
(
const
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
vDebug
(
"vgId:%d, continue write vnode snapshot, len:%d"
,
pVnode
->
config
.
vgId
,
len
);
int32_t
code
=
vnodeSnapWrite
(
pWriter
,
pBuf
,
len
);
vDebug
(
"vgId:%d, continue write vnode snapshot finished, len:%d"
,
pVnode
->
config
.
vgId
,
len
);
return
code
;
#else
return
0
;
#endif
}
static
void
vnodeRestoreFinish
(
const
SSyncFSM
*
pFsm
)
{
...
...
@@ -461,7 +402,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode
*
pVnode
=
pFsm
->
data
;
vDebug
(
"vgId:%d, become follower"
,
pVnode
->
config
.
vgId
);
// clear old leader resource
taosThreadMutexLock
(
&
pVnode
->
lock
);
if
(
pVnode
->
blocked
)
{
pVnode
->
blocked
=
false
;
...
...
@@ -474,15 +414,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
static
void
vnodeBecomeLeader
(
const
SSyncFSM
*
pFsm
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vDebug
(
"vgId:%d, become leader"
,
pVnode
->
config
.
vgId
);
#if 0
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
pVnode->blocked = false;
tsem_post(&pVnode->syncSem);
}
taosThreadMutexUnlock(&pVnode->lock);
#endif
}
static
SSyncFSM
*
vnodeSyncMakeFsm
(
SVnode
*
pVnode
)
{
...
...
source/util/src/tqueue.c
浏览文件 @
a0983cf8
...
...
@@ -21,46 +21,6 @@
int64_t
tsRpcQueueMemoryAllowed
=
0
;
int64_t
tsRpcQueueMemoryUsed
=
0
;
typedef
struct
STaosQnode
STaosQnode
;
typedef
struct
STaosQnode
{
STaosQnode
*
next
;
STaosQueue
*
queue
;
int64_t
timestamp
;
int32_t
size
;
int8_t
itype
;
int8_t
reserved
[
3
];
char
item
[];
}
STaosQnode
;
typedef
struct
STaosQueue
{
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
FItem
itemFp
;
FItems
itemsFp
;
TdThreadMutex
mutex
;
int64_t
memOfItems
;
int32_t
numOfItems
;
}
STaosQueue
;
typedef
struct
STaosQset
{
STaosQueue
*
head
;
STaosQueue
*
current
;
TdThreadMutex
mutex
;
tsem_t
sem
;
int32_t
numOfQueues
;
int32_t
numOfItems
;
}
STaosQset
;
typedef
struct
STaosQall
{
STaosQnode
*
current
;
STaosQnode
*
start
;
int32_t
numOfItems
;
}
STaosQall
;
STaosQueue
*
taosOpenQueue
()
{
STaosQueue
*
queue
=
taosMemoryCalloc
(
1
,
sizeof
(
STaosQueue
));
if
(
queue
==
NULL
)
{
...
...
source/util/src/tworker.c
浏览文件 @
a0983cf8
...
...
@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker
->
pool
=
pool
;
}
u
Info
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
u
Debug
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
return
0
;
}
...
...
@@ -73,11 +73,12 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
u
Debug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
u
Info
(
"worker:%s:%d is running, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
taosGetSelfPthreadId
()
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pool
->
qset
,
(
void
**
)
&
msg
,
&
qinfo
)
==
0
)
{
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
pool
->
qset
);
uInfo
(
"worker:%s:%d qset:%p, got no message and exiting, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
pool
->
qset
,
taosGetSelfPthreadId
());
break
;
}
...
...
@@ -191,12 +192,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
u
Debug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
u
Info
(
"worker:%s:%d is running, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
taosGetSelfPthreadId
()
);
while
(
1
)
{
numOfMsgs
=
taosReadAllQitemsFromQset
(
worker
->
qset
,
worker
->
qall
,
&
qinfo
);
if
(
numOfMsgs
==
0
)
{
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
worker
->
qset
);
uInfo
(
"worker:%s:%d qset:%p, got no message and exiting, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
worker
->
qset
,
taosGetSelfPthreadId
());
break
;
}
...
...
@@ -244,7 +246,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pool
->
nextId
=
(
pool
->
nextId
+
1
)
%
pool
->
max
;
}
uDebug
(
"worker:%s, queue:%p is allocated, ahandle:%p"
,
pool
->
name
,
queue
,
ahandle
);
queue
->
threadId
=
taosGetPthreadId
(
worker
->
thread
);
uDebug
(
"worker:%s, queue:%p is allocated, ahandle:%p thread:%08"
PRId64
,
pool
->
name
,
queue
,
ahandle
,
queue
->
threadId
);
code
=
0
;
_OVER:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录