Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
259e2941
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
259e2941
编写于
12月 09, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-2393
上级
d9da9fc9
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
60 addition
and
236 deletion
+60
-236
src/dnode/inc/dnodeVRead.h
src/dnode/inc/dnodeVRead.h
+4
-2
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+30
-13
src/inc/dnode.h
src/inc/dnode.h
+4
-2
src/vnode/inc/vnodeCancel.h
src/vnode/inc/vnodeCancel.h
+0
-33
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+3
-2
src/vnode/src/vnodeCancel.c
src/vnode/src/vnodeCancel.c
+0
-169
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+11
-6
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+8
-9
未找到文件。
src/dnode/inc/dnodeVRead.h
浏览文件 @
259e2941
...
...
@@ -24,8 +24,10 @@ extern "C" {
int32_t
dnodeInitVRead
();
void
dnodeCleanupVRead
();
void
dnodeDispatchToVReadQueue
(
SRpcMsg
*
pMsg
);
void
*
dnodeAllocVReadQueue
(
void
*
pVnode
);
void
dnodeFreeVReadQueue
(
void
*
pRqueue
);
void
*
dnodeAllocVQueryQueue
(
void
*
pVnode
);
void
*
dnodeAllocVFetchQueue
(
void
*
pVnode
);
void
dnodeFreeVQueryQueue
(
void
*
pQqueue
);
void
dnodeFreeVFetchQueue
(
void
*
pFqueue
);
#ifdef __cplusplus
}
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
259e2941
...
...
@@ -22,20 +22,29 @@
static
void
*
dnodeProcessReadQueue
(
void
*
pWorker
);
// module global variable
static
SWorkerPool
tsVReadWP
;
static
SWorkerPool
tsVQueryWP
;
static
SWorkerPool
tsVFetchWP
;
int32_t
dnodeInitVRead
()
{
tsVReadWP
.
name
=
"vquery"
;
tsVReadWP
.
workerFp
=
dnodeProcessReadQueue
;
tsVReadWP
.
min
=
tsNumOfCores
;
tsVReadWP
.
max
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
if
(
tsVReadWP
.
max
<=
tsVReadWP
.
min
*
2
)
tsVReadWP
.
max
=
2
*
tsVReadWP
.
min
;
return
tWorkerInit
(
&
tsVReadWP
);
tsVQueryWP
.
name
=
"vquery"
;
tsVQueryWP
.
workerFp
=
dnodeProcessReadQueue
;
tsVQueryWP
.
min
=
tsNumOfCores
;
tsVQueryWP
.
max
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
if
(
tsVQueryWP
.
max
<=
tsVQueryWP
.
min
*
2
)
tsVQueryWP
.
max
=
2
*
tsVQueryWP
.
min
;
if
(
tWorkerInit
(
&
tsVQueryWP
)
!=
0
)
return
-
1
;
tsVFetchWP
.
name
=
"vfetch"
;
tsVFetchWP
.
workerFp
=
dnodeProcessReadQueue
;
tsVFetchWP
.
min
=
1
;
tsVFetchWP
.
max
=
1
;
if
(
tWorkerInit
(
&
tsVFetchWP
)
!=
0
)
return
-
1
;
return
0
;
}
void
dnodeCleanupVRead
()
{
tWorkerCleanup
(
&
tsVReadWP
);
tWorkerCleanup
(
&
tsVFetchWP
);
tWorkerCleanup
(
&
tsVQueryWP
);
}
void
dnodeDispatchToVReadQueue
(
SRpcMsg
*
pMsg
)
{
...
...
@@ -68,12 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
rpcFreeCont
(
pMsg
->
pCont
);
}
void
*
dnodeAllocVReadQueue
(
void
*
pVnode
)
{
return
tWorkerAllocQueue
(
&
tsVReadWP
,
pVnode
);
void
*
dnodeAllocVQueryQueue
(
void
*
pVnode
)
{
return
tWorkerAllocQueue
(
&
tsVQueryWP
,
pVnode
);
}
void
*
dnodeAllocVFetchQueue
(
void
*
pVnode
)
{
return
tWorkerAllocQueue
(
&
tsVFetchWP
,
pVnode
);
}
void
dnodeFreeVQueryQueue
(
void
*
pQqueue
)
{
tWorkerFreeQueue
(
&
tsVQueryWP
,
pQqueue
);
}
void
dnodeFreeV
ReadQueue
(
void
*
pR
queue
)
{
tWorkerFreeQueue
(
&
tsV
ReadWP
,
pR
queue
);
void
dnodeFreeV
FetchQueue
(
void
*
pF
queue
)
{
tWorkerFreeQueue
(
&
tsV
FetchWP
,
pF
queue
);
}
void
dnodeSendRpcVReadRsp
(
void
*
pVnode
,
SVReadMsg
*
pRead
,
int32_t
code
)
{
...
...
src/inc/dnode.h
浏览文件 @
259e2941
...
...
@@ -56,8 +56,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void
*
dnodeAllocVWriteQueue
(
void
*
pVnode
);
void
dnodeFreeVWriteQueue
(
void
*
pWqueue
);
void
dnodeSendRpcVWriteRsp
(
void
*
pVnode
,
void
*
pWrite
,
int32_t
code
);
void
*
dnodeAllocVReadQueue
(
void
*
pVnode
);
void
dnodeFreeVReadQueue
(
void
*
pRqueue
);
void
*
dnodeAllocVQueryQueue
(
void
*
pVnode
);
void
*
dnodeAllocVFetchQueue
(
void
*
pVnode
);
void
dnodeFreeVQueryQueue
(
void
*
pQqueue
);
void
dnodeFreeVFetchQueue
(
void
*
pFqueue
);
int32_t
dnodeAllocateMPeerQueue
();
void
dnodeFreeMPeerQueue
();
...
...
src/vnode/inc/vnodeCancel.h
已删除
100644 → 0
浏览文件 @
d9da9fc9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_CANCEL_H
#define TDENGINE_VNODE_CANCEL_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "vnode.h"
#include "vnodeInt.h"
int32_t
vnodeInitCWorker
();
void
vnodeCleanupCWorker
();
int32_t
vnodeWriteIntoCQueue
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
#ifdef __cplusplus
}
#endif
#endif
src/vnode/inc/vnodeInt.h
浏览文件 @
259e2941
...
...
@@ -47,8 +47,9 @@ typedef struct {
int8_t
isCommiting
;
uint64_t
version
;
// current version
uint64_t
fversion
;
// version on saved data file
void
*
wqueue
;
void
*
rqueue
;
void
*
wqueue
;
// write queue
void
*
qqueue
;
// read query queue
void
*
fqueue
;
// read fetch/cancel queue
void
*
wal
;
void
*
tsdb
;
int64_t
sync
;
...
...
src/vnode/src/vnodeCancel.c
已删除
100644 → 0
浏览文件 @
d9da9fc9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h"
#include "dnode.h"
#include "tsdb.h"
#include "vnodeCancel.h"
typedef
struct
{
pthread_t
thread
;
int32_t
workerId
;
}
SVCWorker
;
typedef
struct
{
int32_t
curNum
;
int32_t
maxNum
;
SVCWorker
*
worker
;
}
SVCWorkerPool
;
static
SVCWorkerPool
tsVCWorkerPool
;
static
taos_qset
tsVCWorkerQset
;
static
taos_queue
tsVCWorkerQueue
;
static
void
*
vnodeCWorkerFunc
(
void
*
param
);
static
int32_t
vnodeStartCWorker
()
{
tsVCWorkerQueue
=
taosOpenQueue
();
if
(
tsVCWorkerQueue
==
NULL
)
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
taosAddIntoQset
(
tsVCWorkerQset
,
tsVCWorkerQueue
,
NULL
);
for
(
int32_t
i
=
tsVCWorkerPool
.
curNum
;
i
<
tsVCWorkerPool
.
maxNum
;
++
i
)
{
SVCWorker
*
pWorker
=
tsVCWorkerPool
.
worker
+
i
;
pWorker
->
workerId
=
i
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
vnodeCWorkerFunc
,
pWorker
)
!=
0
)
{
vError
(
"failed to create thread to process vcworker queue, reason:%s"
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
tsVCWorkerPool
.
curNum
=
i
+
1
;
vDebug
(
"vcworker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsVCWorkerPool
.
maxNum
);
}
vDebug
(
"vcworker queue:%p is allocated"
,
tsVCWorkerQueue
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeInitCWorker
()
{
tsVCWorkerQset
=
taosOpenQset
();
tsVCWorkerPool
.
maxNum
=
1
;
tsVCWorkerPool
.
curNum
=
0
;
tsVCWorkerPool
.
worker
=
calloc
(
sizeof
(
SVCWorker
),
tsVCWorkerPool
.
maxNum
);
if
(
tsVCWorkerPool
.
worker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsVCWorkerPool
.
maxNum
;
++
i
)
{
SVCWorker
*
pWorker
=
tsVCWorkerPool
.
worker
+
i
;
pWorker
->
workerId
=
i
;
vDebug
(
"vcworker:%d is created"
,
i
);
}
vDebug
(
"vcworker is initialized, num:%d qset:%p"
,
tsVCWorkerPool
.
maxNum
,
tsVCWorkerQset
);
return
vnodeStartCWorker
();
}
static
void
vnodeStopCWorker
()
{
vDebug
(
"vcworker queue:%p is freed"
,
tsVCWorkerQueue
);
taosCloseQueue
(
tsVCWorkerQueue
);
tsVCWorkerQueue
=
NULL
;
}
void
vnodeCleanupCWorker
()
{
for
(
int32_t
i
=
0
;
i
<
tsVCWorkerPool
.
maxNum
;
++
i
)
{
SVCWorker
*
pWorker
=
tsVCWorkerPool
.
worker
+
i
;
if
(
pWorker
->
thread
)
{
taosQsetThreadResume
(
tsVCWorkerQset
);
}
vDebug
(
"vcworker:%d is closed"
,
i
);
}
for
(
int32_t
i
=
0
;
i
<
tsVCWorkerPool
.
maxNum
;
++
i
)
{
SVCWorker
*
pWorker
=
tsVCWorkerPool
.
worker
+
i
;
vDebug
(
"vcworker:%d start to join"
,
i
);
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
}
vDebug
(
"vcworker:%d join success"
,
i
);
}
vDebug
(
"vcworker is closed, qset:%p"
,
tsVCWorkerQset
);
taosCloseQset
(
tsVCWorkerQset
);
tsVCWorkerQset
=
NULL
;
tfree
(
tsVCWorkerPool
.
worker
);
vnodeStopCWorker
();
}
int32_t
vnodeWriteIntoCQueue
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
)
{
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
pRead
->
pVnode
=
pVnode
;
vTrace
(
"vgId:%d, write into vcqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
tsVCWorkerQueue
,
pRead
->
qtype
,
pRead
);
}
static
void
vnodeFreeFromCQueue
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
)
{
vTrace
(
"vgId:%d, free from vcqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
taosFreeQitem
(
pRead
);
vnodeRelease
(
pVnode
);
}
static
void
vnodeSendVCancelRpcRsp
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pRead
->
rpcHandle
,
.
pCont
=
pRead
->
rspRet
.
rsp
,
.
contLen
=
pRead
->
rspRet
.
len
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
vnodeFreeFromCQueue
(
pVnode
,
pRead
);
}
static
void
*
vnodeCWorkerFunc
(
void
*
param
)
{
int32_t
qtype
;
SVReadMsg
*
pRead
;
SVnodeObj
*
pVnode
;
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsVCWorkerQset
,
&
qtype
,
(
void
**
)
&
pRead
,
(
void
**
)
&
pVnode
)
==
0
)
{
vDebug
(
"qset:%p, vcworker got no message from qset, exiting"
,
tsVCWorkerQset
);
break
;
}
assert
(
qtype
==
TAOS_QTYPE_RPC
);
assert
(
pVnode
==
NULL
);
assert
(
pRead
->
pVnode
!=
NULL
);
int32_t
code
=
vnodeProcessRead
(
pRead
->
pVnode
,
pRead
);
vnodeSendVCancelRpcRsp
(
pRead
->
pVnode
,
pRead
,
code
);
}
return
NULL
;
}
src/vnode/src/vnodeMain.c
浏览文件 @
259e2941
...
...
@@ -28,7 +28,6 @@
#include "vnodeMgmt.h"
#include "vnodeWorker.h"
#include "vnodeMain.h"
#include "vnodeCancel.h"
static
int32_t
vnodeProcessTsdbStatus
(
void
*
arg
,
int32_t
status
,
int32_t
eno
);
...
...
@@ -213,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) {
pVnode
->
fversion
=
pVnode
->
version
;
pVnode
->
wqueue
=
dnodeAllocVWriteQueue
(
pVnode
);
pVnode
->
rqueue
=
dnodeAllocVReadQueue
(
pVnode
);
if
(
pVnode
->
wqueue
==
NULL
||
pVnode
->
rqueue
==
NULL
)
{
pVnode
->
qqueue
=
dnodeAllocVQueryQueue
(
pVnode
);
pVnode
->
fqueue
=
dnodeAllocVFetchQueue
(
pVnode
);
if
(
pVnode
->
wqueue
==
NULL
||
pVnode
->
qqueue
==
NULL
||
pVnode
->
fqueue
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
return
terrno
;
}
...
...
@@ -374,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) {
pVnode
->
wqueue
=
NULL
;
}
if
(
pVnode
->
rqueue
)
{
dnodeFreeVReadQueue
(
pVnode
->
rqueue
);
pVnode
->
rqueue
=
NULL
;
if
(
pVnode
->
qqueue
)
{
dnodeFreeVQueryQueue
(
pVnode
->
qqueue
);
pVnode
->
qqueue
=
NULL
;
}
if
(
pVnode
->
fqueue
)
{
dnodeFreeVFetchQueue
(
pVnode
->
fqueue
);
pVnode
->
fqueue
=
NULL
;
}
tfree
(
pVnode
->
rootDir
);
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
259e2941
...
...
@@ -14,11 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tqueue.h"
#include "vnodeCancel.h"
#include "tglobal.h"
#include "query.h"
#include "vnodeStatus.h"
...
...
@@ -119,15 +117,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
}
pRead
->
qtype
=
qtype
;
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pRead
->
msgType
==
TSDB_MSG_TYPE_CANCEL_QUERY
)
{
pRead
->
msgType
=
TSDB_MSG_TYPE_CANCEL_QUERY
;
return
vnodeWriteIntoCQueue
(
pVnode
,
pRead
);
}
else
{
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
atomic_add_fetch_32
(
&
pVnode
->
queuedRMsg
,
1
);
vTrace
(
"vgId:%d, write into vrqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
pVnode
->
rqueue
,
qtype
,
pRead
);
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pRead
->
msgType
==
TSDB_MSG_TYPE_CANCEL_QUERY
||
pRead
->
msgType
==
TSDB_MSG_TYPE_FETCH
)
{
vTrace
(
"vgId:%d, write into vfetch queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
pVnode
->
fqueue
,
qtype
,
pRead
);
}
else
{
vTrace
(
"vgId:%d, write into vquery queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
pVnode
->
qqueue
,
qtype
,
pRead
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录