Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
4bc9284a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4bc9284a
编写于
11月 06, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1915
上级
048d0a46
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
316 addition
and
327 deletion
+316
-327
src/dnode/inc/dnodeMPeer.h
src/dnode/inc/dnodeMPeer.h
+5
-5
src/dnode/inc/dnodeMRead.h
src/dnode/inc/dnodeMRead.h
+5
-5
src/dnode/inc/dnodeMWrite.h
src/dnode/inc/dnodeMWrite.h
+5
-5
src/dnode/src/dnodeMPeer.c
src/dnode/src/dnodeMPeer.c
+30
-30
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+38
-38
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+37
-37
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+13
-13
src/dnode/src/dnodePeer.c
src/dnode/src/dnodePeer.c
+21
-21
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+44
-44
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+1
-1
src/inc/dnode.h
src/inc/dnode.h
+16
-16
src/mnode/src/mnodeMain.c
src/mnode/src/mnodeMain.c
+6
-6
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+12
-12
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+5
-5
src/plugins/monitor/src/monitorMain.c
src/plugins/monitor/src/monitorMain.c
+32
-32
src/vnode/src/vnodeCfg.c
src/vnode/src/vnodeCfg.c
+0
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+12
-18
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+32
-35
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+1
-2
未找到文件。
src/dnode/inc/dnodeMPeer.h
浏览文件 @
4bc9284a
...
...
@@ -20,11 +20,11 @@
extern
"C"
{
#endif
int32_t
dnodeInitM
node
Peer
();
void
dnodeCleanupM
node
Peer
();
int32_t
dnodeAllocateM
nodePq
ueue
();
void
dnodeFreeM
nodePq
ueue
();
void
dnodeDispatchToM
node
PeerQueue
(
SRpcMsg
*
pMsg
);
int32_t
dnodeInitMPeer
();
void
dnodeCleanupMPeer
();
int32_t
dnodeAllocateM
PeerQ
ueue
();
void
dnodeFreeM
PeerQ
ueue
();
void
dnodeDispatchToMPeerQueue
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
src/dnode/inc/dnodeMRead.h
浏览文件 @
4bc9284a
...
...
@@ -20,11 +20,11 @@
extern
"C"
{
#endif
int32_t
dnodeInitM
node
Read
();
void
dnodeCleanupM
node
Read
();
int32_t
dnodeAlloc
ateMnodeRq
ueue
();
void
dnodeFreeM
nodeRq
ueue
();
void
dnodeDispatchToM
node
ReadQueue
(
SRpcMsg
*
rpcMsg
);
int32_t
dnodeInitMRead
();
void
dnodeCleanupMRead
();
int32_t
dnodeAlloc
MReadQ
ueue
();
void
dnodeFreeM
ReadQ
ueue
();
void
dnodeDispatchToMReadQueue
(
SRpcMsg
*
rpcMsg
);
#ifdef __cplusplus
}
...
...
src/dnode/inc/dnodeMWrite.h
浏览文件 @
4bc9284a
...
...
@@ -20,11 +20,11 @@
extern
"C"
{
#endif
int32_t
dnodeInitM
node
Write
();
void
dnodeCleanupM
node
Write
();
int32_t
dnodeAlloc
ateMnodeW
queue
();
void
dnodeFreeM
nodeW
queue
();
void
dnodeDispatchToM
node
WriteQueue
(
SRpcMsg
*
pMsg
);
int32_t
dnodeInitMWrite
();
void
dnodeCleanupMWrite
();
int32_t
dnodeAlloc
MWrite
queue
();
void
dnodeFreeM
Write
queue
();
void
dnodeDispatchToMWriteQueue
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
src/dnode/src/dnodeMPeer.c
浏览文件 @
4bc9284a
...
...
@@ -35,44 +35,44 @@ typedef struct {
typedef
struct
{
int32_t
curNum
;
int32_t
maxNum
;
SMPeerWorker
*
peerW
orker
;
SMPeerWorker
*
w
orker
;
}
SMPeerWorkerPool
;
static
SMPeerWorkerPool
tsMPeer
Pool
;
static
SMPeerWorkerPool
tsMPeer
WP
;
static
taos_qset
tsMPeerQset
;
static
taos_queue
tsMPeerQueue
;
static
void
*
dnodeProcessM
node
PeerQueue
(
void
*
param
);
static
void
*
dnodeProcessMPeerQueue
(
void
*
param
);
int32_t
dnodeInitM
node
Peer
()
{
int32_t
dnodeInitMPeer
()
{
tsMPeerQset
=
taosOpenQset
();
tsMPeer
Pool
.
maxNum
=
1
;
tsMPeer
Pool
.
curNum
=
0
;
tsMPeer
Pool
.
peerWorker
=
(
SMPeerWorker
*
)
calloc
(
sizeof
(
SMPeerWorker
),
tsMPeerPool
.
maxNum
);
tsMPeer
WP
.
maxNum
=
1
;
tsMPeer
WP
.
curNum
=
0
;
tsMPeer
WP
.
worker
=
(
SMPeerWorker
*
)
calloc
(
sizeof
(
SMPeerWorker
),
tsMPeerWP
.
maxNum
);
if
(
tsMPeer
Pool
.
peerW
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMPeer
Pool
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
Pool
.
peerW
orker
+
i
;
if
(
tsMPeer
WP
.
w
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMPeer
WP
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
dDebug
(
"dnode mpeer worker:%d is created"
,
i
);
}
dDebug
(
"dnode mpeer is initialized, workers:%d qset:%p"
,
tsMPeer
Pool
.
maxNum
,
tsMPeerQset
);
dDebug
(
"dnode mpeer is initialized, workers:%d qset:%p"
,
tsMPeer
WP
.
maxNum
,
tsMPeerQset
);
return
0
;
}
void
dnodeCleanupM
node
Peer
()
{
for
(
int32_t
i
=
0
;
i
<
tsMPeer
Pool
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
Pool
.
peerW
orker
+
i
;
void
dnodeCleanupMPeer
()
{
for
(
int32_t
i
=
0
;
i
<
tsMPeer
WP
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
WP
.
w
orker
+
i
;
if
(
pWorker
->
thread
)
{
taosQsetThreadResume
(
tsMPeerQset
);
}
dDebug
(
"dnode mpeer worker:%d is closed"
,
i
);
}
for
(
int32_t
i
=
0
;
i
<
tsMPeer
Pool
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
Pool
.
peerW
orker
+
i
;
for
(
int32_t
i
=
0
;
i
<
tsMPeer
WP
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
WP
.
w
orker
+
i
;
dDebug
(
"dnode mpeer worker:%d start to join"
,
i
);
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
...
...
@@ -84,44 +84,44 @@ void dnodeCleanupMnodePeer() {
taosCloseQset
(
tsMPeerQset
);
tsMPeerQset
=
NULL
;
taosTFree
(
tsMPeer
Pool
.
peerW
orker
);
taosTFree
(
tsMPeer
WP
.
w
orker
);
}
int32_t
dnodeAllocateM
nodePq
ueue
()
{
int32_t
dnodeAllocateM
PeerQ
ueue
()
{
tsMPeerQueue
=
taosOpenQueue
();
if
(
tsMPeerQueue
==
NULL
)
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
taosAddIntoQset
(
tsMPeerQset
,
tsMPeerQueue
,
NULL
);
for
(
int32_t
i
=
tsMPeer
Pool
.
curNum
;
i
<
tsMPeerPool
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
Pool
.
peerW
orker
+
i
;
for
(
int32_t
i
=
tsMPeer
WP
.
curNum
;
i
<
tsMPeerWP
.
maxNum
;
++
i
)
{
SMPeerWorker
*
pWorker
=
tsMPeer
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessM
node
PeerQueue
,
pWorker
)
!=
0
)
{
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessMPeerQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process mpeer queue, reason:%s"
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
tsMPeer
Pool
.
curNum
=
i
+
1
;
dDebug
(
"dnode mpeer worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMPeer
Pool
.
maxNum
);
tsMPeer
WP
.
curNum
=
i
+
1
;
dDebug
(
"dnode mpeer worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMPeer
WP
.
maxNum
);
}
dDebug
(
"dnode mpeer queue:%p is allocated"
,
tsMPeerQueue
);
return
TSDB_CODE_SUCCESS
;
}
void
dnodeFreeM
nodePq
ueue
()
{
void
dnodeFreeM
PeerQ
ueue
()
{
dDebug
(
"dnode mpeer queue:%p is freed"
,
tsMPeerQueue
);
taosCloseQueue
(
tsMPeerQueue
);
tsMPeerQueue
=
NULL
;
}
void
dnodeDispatchToM
node
PeerQueue
(
SRpcMsg
*
pMsg
)
{
void
dnodeDispatchToMPeerQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMPeerQueue
==
NULL
)
{
dnodeSendRedirectMsg
(
pMsg
,
false
);
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -133,12 +133,12 @@ void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) {
taosWriteQitem
(
tsMPeerQueue
,
TAOS_QTYPE_RPC
,
pPeer
);
}
static
void
dnodeFreeM
node
PeerMsg
(
SMnodeMsg
*
pPeer
)
{
static
void
dnodeFreeMPeerMsg
(
SMnodeMsg
*
pPeer
)
{
mnodeCleanupMsg
(
pPeer
);
taosFreeQitem
(
pPeer
);
}
static
void
dnodeSendRpcM
node
PeerRsp
(
SMnodeMsg
*
pPeer
,
int32_t
code
)
{
static
void
dnodeSendRpcMPeerRsp
(
SMnodeMsg
*
pPeer
,
int32_t
code
)
{
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
SRpcMsg
rpcRsp
=
{
...
...
@@ -149,10 +149,10 @@ static void dnodeSendRpcMnodePeerRsp(SMnodeMsg *pPeer, int32_t code) {
};
rpcSendResponse
(
&
rpcRsp
);
dnodeFreeM
node
PeerMsg
(
pPeer
);
dnodeFreeMPeerMsg
(
pPeer
);
}
static
void
*
dnodeProcessM
node
PeerQueue
(
void
*
param
)
{
static
void
*
dnodeProcessMPeerQueue
(
void
*
param
)
{
SMnodeMsg
*
pPeerMsg
;
int32_t
type
;
void
*
unUsed
;
...
...
@@ -165,7 +165,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
dDebug
(
"msg:%s will be processed in mpeer queue"
,
taosMsg
[
pPeerMsg
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessPeerReq
(
pPeerMsg
);
dnodeSendRpcM
node
PeerRsp
(
pPeerMsg
,
code
);
dnodeSendRpcMPeerRsp
(
pPeerMsg
,
code
);
}
return
NULL
;
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
4bc9284a
...
...
@@ -35,46 +35,46 @@ typedef struct {
typedef
struct
{
int32_t
curNum
;
int32_t
maxNum
;
SMReadWorker
*
readW
orker
;
SMReadWorker
*
w
orker
;
}
SMReadWorkerPool
;
static
SMReadWorkerPool
tsMRead
Pool
;
static
SMReadWorkerPool
tsMRead
WP
;
static
taos_qset
tsMReadQset
;
static
taos_queue
tsMReadQueue
;
static
void
*
dnodeProcessM
node
ReadQueue
(
void
*
param
);
static
void
*
dnodeProcessMReadQueue
(
void
*
param
);
int32_t
dnodeInitM
node
Read
()
{
int32_t
dnodeInitMRead
()
{
tsMReadQset
=
taosOpenQset
();
tsMRead
Pool
.
maxNum
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
2
;
tsMRead
Pool
.
maxNum
=
MAX
(
2
,
tsMReadPool
.
maxNum
);
tsMRead
Pool
.
maxNum
=
MIN
(
4
,
tsMReadPool
.
maxNum
);
tsMRead
Pool
.
curNum
=
0
;
tsMRead
Pool
.
readWorker
=
(
SMReadWorker
*
)
calloc
(
sizeof
(
SMReadWorker
),
tsMReadPool
.
maxNum
);
tsMRead
WP
.
maxNum
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
2
;
tsMRead
WP
.
maxNum
=
MAX
(
2
,
tsMReadWP
.
maxNum
);
tsMRead
WP
.
maxNum
=
MIN
(
4
,
tsMReadWP
.
maxNum
);
tsMRead
WP
.
curNum
=
0
;
tsMRead
WP
.
worker
=
(
SMReadWorker
*
)
calloc
(
sizeof
(
SMReadWorker
),
tsMReadWP
.
maxNum
);
if
(
tsMRead
Pool
.
readW
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMRead
Pool
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
Pool
.
readW
orker
+
i
;
if
(
tsMRead
WP
.
w
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMRead
WP
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
dDebug
(
"dnode mread worker:%d is created"
,
i
);
}
dDebug
(
"dnode mread is initialized, workers:%d qset:%p"
,
tsMRead
Pool
.
maxNum
,
tsMReadQset
);
dDebug
(
"dnode mread is initialized, workers:%d qset:%p"
,
tsMRead
WP
.
maxNum
,
tsMReadQset
);
return
0
;
}
void
dnodeCleanupM
node
Read
()
{
for
(
int32_t
i
=
0
;
i
<
tsMRead
Pool
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
Pool
.
readW
orker
+
i
;
void
dnodeCleanupMRead
()
{
for
(
int32_t
i
=
0
;
i
<
tsMRead
WP
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
WP
.
w
orker
+
i
;
if
(
pWorker
->
thread
)
{
taosQsetThreadResume
(
tsMReadQset
);
}
dDebug
(
"dnode mread worker:%d is closed"
,
i
);
}
for
(
int32_t
i
=
0
;
i
<
tsMRead
Pool
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
Pool
.
readW
orker
+
i
;
for
(
int32_t
i
=
0
;
i
<
tsMRead
WP
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
WP
.
w
orker
+
i
;
dDebug
(
"dnode mread worker:%d start to join"
,
i
);
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
...
...
@@ -86,43 +86,43 @@ void dnodeCleanupMnodeRead() {
taosCloseQset
(
tsMReadQset
);
tsMReadQset
=
NULL
;
free
(
tsMRead
Pool
.
readW
orker
);
free
(
tsMRead
WP
.
w
orker
);
}
int32_t
dnodeAlloc
ateMnodeRq
ueue
()
{
int32_t
dnodeAlloc
MReadQ
ueue
()
{
tsMReadQueue
=
taosOpenQueue
();
if
(
tsMReadQueue
==
NULL
)
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
taosAddIntoQset
(
tsMReadQset
,
tsMReadQueue
,
NULL
);
for
(
int32_t
i
=
tsMRead
Pool
.
curNum
;
i
<
tsMReadPool
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
Pool
.
readW
orker
+
i
;
for
(
int32_t
i
=
tsMRead
WP
.
curNum
;
i
<
tsMReadWP
.
maxNum
;
++
i
)
{
SMReadWorker
*
pWorker
=
tsMRead
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessM
node
ReadQueue
,
pWorker
)
!=
0
)
{
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessMReadQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process mread queue, reason:%s"
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
tsMRead
Pool
.
curNum
=
i
+
1
;
dDebug
(
"dnode mread worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMRead
Pool
.
maxNum
);
tsMRead
WP
.
curNum
=
i
+
1
;
dDebug
(
"dnode mread worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMRead
WP
.
maxNum
);
}
dDebug
(
"dnode mread queue:%p is allocated"
,
tsMReadQueue
);
return
TSDB_CODE_SUCCESS
;
}
void
dnodeFreeM
nodeRq
ueue
()
{
void
dnodeFreeM
ReadQ
ueue
()
{
dDebug
(
"dnode mread queue:%p is freed"
,
tsMReadQueue
);
taosCloseQueue
(
tsMReadQueue
);
tsMReadQueue
=
NULL
;
}
void
dnodeDispatchToM
node
ReadQueue
(
SRpcMsg
*
pMsg
)
{
void
dnodeDispatchToMReadQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMReadQueue
==
NULL
)
{
dnodeSendRedirectMsg
(
pMsg
,
true
);
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -134,16 +134,16 @@ void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) {
taosWriteQitem
(
tsMReadQueue
,
TAOS_QTYPE_RPC
,
pRead
);
}
static
void
dnodeFreeM
node
ReadMsg
(
SMnodeMsg
*
pRead
)
{
static
void
dnodeFreeMReadMsg
(
SMnodeMsg
*
pRead
)
{
mnodeCleanupMsg
(
pRead
);
taosFreeQitem
(
pRead
);
}
static
void
dnodeSendRpcM
node
ReadRsp
(
SMnodeMsg
*
pRead
,
int32_t
code
)
{
static
void
dnodeSendRpcMReadRsp
(
SMnodeMsg
*
pRead
,
int32_t
code
)
{
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
TSDB_CODE_MND_ACTION_NEED_REPROCESSED
)
{
// may be a auto create req, should put into write queue
dnodeReprocessM
node
WriteMsg
(
pRead
);
dnodeReprocessMWriteMsg
(
pRead
);
return
;
}
...
...
@@ -155,23 +155,23 @@ static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) {
};
rpcSendResponse
(
&
rpcRsp
);
dnodeFreeM
node
ReadMsg
(
pRead
);
dnodeFreeMReadMsg
(
pRead
);
}
static
void
*
dnodeProcessM
node
ReadQueue
(
void
*
param
)
{
SMnodeMsg
*
pRead
Msg
;
static
void
*
dnodeProcessMReadQueue
(
void
*
param
)
{
SMnodeMsg
*
pRead
;
int32_t
type
;
void
*
unUsed
;
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsMReadQset
,
&
type
,
(
void
**
)
&
pRead
Msg
,
&
unUsed
)
==
0
)
{
if
(
taosReadQitemFromQset
(
tsMReadQset
,
&
type
,
(
void
**
)
&
pRead
,
&
unUsed
)
==
0
)
{
dDebug
(
"qset:%p, mnode read got no message from qset, exiting"
,
tsMReadQset
);
break
;
}
dDebug
(
"%p, msg:%s will be processed in mread queue"
,
pRead
Msg
->
rpcMsg
.
ahandle
,
taosMsg
[
pReadMsg
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessRead
(
pRead
Msg
);
dnodeSendRpcM
nodeReadRsp
(
pReadMsg
,
code
);
dDebug
(
"%p, msg:%s will be processed in mread queue"
,
pRead
->
rpcMsg
.
ahandle
,
taosMsg
[
pRead
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessRead
(
pRead
);
dnodeSendRpcM
ReadRsp
(
pRead
,
code
);
}
return
NULL
;
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
4bc9284a
...
...
@@ -36,45 +36,45 @@ typedef struct {
typedef
struct
{
int32_t
curNum
;
int32_t
maxNum
;
SMWriteWorker
*
w
riteW
orker
;
SMWriteWorker
*
worker
;
}
SMWriteWorkerPool
;
static
SMWriteWorkerPool
tsMWrite
Pool
;
static
SMWriteWorkerPool
tsMWrite
WP
;
static
taos_qset
tsMWriteQset
;
static
taos_queue
tsMWriteQueue
;
extern
void
*
tsDnodeTmr
;
static
void
*
dnodeProcessM
node
WriteQueue
(
void
*
param
);
static
void
*
dnodeProcessMWriteQueue
(
void
*
param
);
int32_t
dnodeInitM
node
Write
()
{
int32_t
dnodeInitMWrite
()
{
tsMWriteQset
=
taosOpenQset
();
tsMWrite
Pool
.
maxNum
=
1
;
tsMWrite
Pool
.
curNum
=
0
;
tsMWrite
Pool
.
writeWorker
=
(
SMWriteWorker
*
)
calloc
(
sizeof
(
SMWriteWorker
),
tsMWritePool
.
maxNum
);
tsMWrite
WP
.
maxNum
=
1
;
tsMWrite
WP
.
curNum
=
0
;
tsMWrite
WP
.
worker
=
(
SMWriteWorker
*
)
calloc
(
sizeof
(
SMWriteWorker
),
tsMWriteWP
.
maxNum
);
if
(
tsMWrite
Pool
.
writeW
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMWrite
Pool
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
Pool
.
writeW
orker
+
i
;
if
(
tsMWrite
WP
.
w
orker
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tsMWrite
WP
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
dDebug
(
"dnode mwrite worker:%d is created"
,
i
);
}
dDebug
(
"dnode mwrite is initialized, workers:%d qset:%p"
,
tsMWrite
Pool
.
maxNum
,
tsMWriteQset
);
dDebug
(
"dnode mwrite is initialized, workers:%d qset:%p"
,
tsMWrite
WP
.
maxNum
,
tsMWriteQset
);
return
0
;
}
void
dnodeCleanupM
node
Write
()
{
for
(
int32_t
i
=
0
;
i
<
tsMWrite
Pool
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
Pool
.
writeW
orker
+
i
;
void
dnodeCleanupMWrite
()
{
for
(
int32_t
i
=
0
;
i
<
tsMWrite
WP
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
WP
.
w
orker
+
i
;
if
(
pWorker
->
thread
)
{
taosQsetThreadResume
(
tsMWriteQset
);
}
dDebug
(
"dnode mwrite worker:%d is closed"
,
i
);
}
for
(
int32_t
i
=
0
;
i
<
tsMWrite
Pool
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
Pool
.
writeW
orker
+
i
;
for
(
int32_t
i
=
0
;
i
<
tsMWrite
WP
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
WP
.
w
orker
+
i
;
dDebug
(
"dnode mwrite worker:%d start to join"
,
i
);
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
...
...
@@ -86,43 +86,43 @@ void dnodeCleanupMnodeWrite() {
taosCloseQset
(
tsMWriteQset
);
tsMWriteQset
=
NULL
;
taosTFree
(
tsMWrite
Pool
.
writeW
orker
);
taosTFree
(
tsMWrite
WP
.
w
orker
);
}
int32_t
dnodeAlloc
ateMnodeW
queue
()
{
int32_t
dnodeAlloc
MWrite
queue
()
{
tsMWriteQueue
=
taosOpenQueue
();
if
(
tsMWriteQueue
==
NULL
)
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
taosAddIntoQset
(
tsMWriteQset
,
tsMWriteQueue
,
NULL
);
for
(
int32_t
i
=
tsMWrite
Pool
.
curNum
;
i
<
tsMWritePool
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
Pool
.
writeW
orker
+
i
;
for
(
int32_t
i
=
tsMWrite
WP
.
curNum
;
i
<
tsMWriteWP
.
maxNum
;
++
i
)
{
SMWriteWorker
*
pWorker
=
tsMWrite
WP
.
w
orker
+
i
;
pWorker
->
workerId
=
i
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessM
node
WriteQueue
,
pWorker
)
!=
0
)
{
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessMWriteQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process mwrite queue, reason:%s"
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
tsMWrite
Pool
.
curNum
=
i
+
1
;
dDebug
(
"dnode mwrite worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMWrite
Pool
.
maxNum
);
tsMWrite
WP
.
curNum
=
i
+
1
;
dDebug
(
"dnode mwrite worker:%d is launched, total:%d"
,
pWorker
->
workerId
,
tsMWrite
WP
.
maxNum
);
}
dDebug
(
"dnode mwrite queue:%p is allocated"
,
tsMWriteQueue
);
return
TSDB_CODE_SUCCESS
;
}
void
dnodeFreeM
nodeW
queue
()
{
void
dnodeFreeM
Write
queue
()
{
dDebug
(
"dnode mwrite queue:%p is freed"
,
tsMWriteQueue
);
taosCloseQueue
(
tsMWriteQueue
);
tsMWriteQueue
=
NULL
;
}
void
dnodeDispatchToM
node
WriteQueue
(
SRpcMsg
*
pMsg
)
{
void
dnodeDispatchToMWriteQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMWriteQueue
==
NULL
)
{
dnodeSendRedirectMsg
(
pMsg
,
true
);
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -137,7 +137,7 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
}
static
void
dnodeFreeM
node
WriteMsg
(
SMnodeMsg
*
pWrite
)
{
static
void
dnodeFreeMWriteMsg
(
SMnodeMsg
*
pWrite
)
{
dDebug
(
"app:%p:%p, msg:%s is freed from mwrite queue:%p"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrite
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
);
...
...
@@ -145,12 +145,12 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
taosFreeQitem
(
pWrite
);
}
void
dnodeSendRpcM
node
WriteRsp
(
void
*
pMsg
,
int32_t
code
)
{
void
dnodeSendRpcMWriteRsp
(
void
*
pMsg
,
int32_t
code
)
{
SMnodeMsg
*
pWrite
=
pMsg
;
if
(
pWrite
==
NULL
)
return
;
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
TSDB_CODE_MND_ACTION_NEED_REPROCESSED
)
{
dnodeReprocessM
node
WriteMsg
(
pWrite
);
dnodeReprocessMWriteMsg
(
pWrite
);
return
;
}
...
...
@@ -162,10 +162,10 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
};
rpcSendResponse
(
&
rpcRsp
);
dnodeFreeM
node
WriteMsg
(
pWrite
);
dnodeFreeMWriteMsg
(
pWrite
);
}
static
void
*
dnodeProcessM
node
WriteQueue
(
void
*
param
)
{
static
void
*
dnodeProcessMWriteQueue
(
void
*
param
)
{
SMnodeMsg
*
pWrite
;
int32_t
type
;
void
*
unUsed
;
...
...
@@ -180,13 +180,13 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessWrite
(
pWrite
);
dnodeSendRpcM
node
WriteRsp
(
pWrite
,
code
);
dnodeSendRpcMWriteRsp
(
pWrite
,
code
);
}
return
NULL
;
}
void
dnodeReprocessM
node
WriteMsg
(
void
*
pMsg
)
{
void
dnodeReprocessMWriteMsg
(
void
*
pMsg
)
{
SMnodeMsg
*
pWrite
=
pMsg
;
if
(
!
mnodeIsRunning
()
||
tsMWriteQueue
==
NULL
)
{
...
...
@@ -194,7 +194,7 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
pWrite
->
retry
);
dnodeSendRedirectMsg
(
pMsg
,
true
);
dnodeFreeM
node
WriteMsg
(
pWrite
);
dnodeFreeMWriteMsg
(
pWrite
);
}
else
{
dDebug
(
"app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrite
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
,
pWrite
->
retry
);
...
...
@@ -203,12 +203,12 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
}
}
static
void
dnodeDoDelayReprocessM
node
WriteMsg
(
void
*
param
,
void
*
tmrId
)
{
dnodeReprocessM
node
WriteMsg
(
param
);
static
void
dnodeDoDelayReprocessMWriteMsg
(
void
*
param
,
void
*
tmrId
)
{
dnodeReprocessMWriteMsg
(
param
);
}
void
dnodeDelayReprocessM
node
WriteMsg
(
void
*
pMsg
)
{
void
dnodeDelayReprocessMWriteMsg
(
void
*
pMsg
)
{
SMnodeMsg
*
mnodeMsg
=
pMsg
;
void
*
unUsed
=
NULL
;
taosTmrReset
(
dnodeDoDelayReprocessM
node
WriteMsg
,
300
,
mnodeMsg
,
tsDnodeTmr
,
&
unUsed
);
taosTmrReset
(
dnodeDoDelayReprocessMWriteMsg
,
300
,
mnodeMsg
,
tsDnodeTmr
,
&
unUsed
);
}
src/dnode/src/dnodeMain.c
浏览文件 @
4bc9284a
...
...
@@ -37,11 +37,11 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
static
S
DnodeRunStatus
tsDnodeRunStatus
=
TSDB_DNODE
_RUN_STATUS_STOPPED
;
static
S
RunStatus
tsRunStatus
=
TSDB
_RUN_STATUS_STOPPED
;
static
int32_t
dnodeInitStorage
();
static
void
dnodeCleanupStorage
();
static
void
dnodeSetRunStatus
(
S
Dnode
RunStatus
status
);
static
void
dnodeSetRunStatus
(
SRunStatus
status
);
static
void
dnodeCheckDataDirOpenned
(
char
*
dir
);
static
int32_t
dnodeInitComponents
();
static
void
dnodeCleanupComponents
(
int32_t
stepId
);
...
...
@@ -63,9 +63,9 @@ static const SDnodeComponent tsDnodeComponents[] = {
{
"check"
,
dnodeInitCheck
,
dnodeCleanupCheck
},
// NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{
"vread"
,
dnodeInitVRead
,
dnodeCleanupVRead
},
{
"vwrite"
,
dnodeInitVWrite
,
dnodeCleanupVWrite
},
{
"mread"
,
dnodeInitM
nodeRead
,
dnodeCleanupMnode
Read
},
{
"mwrite"
,
dnodeInitM
nodeWrite
,
dnodeCleanupMnode
Write
},
{
"mpeer"
,
dnodeInitM
nodePeer
,
dnodeCleanupMnode
Peer
},
{
"mread"
,
dnodeInitM
Read
,
dnodeCleanupM
Read
},
{
"mwrite"
,
dnodeInitM
Write
,
dnodeCleanupM
Write
},
{
"mpeer"
,
dnodeInitM
Peer
,
dnodeCleanupM
Peer
},
{
"client"
,
dnodeInitClient
,
dnodeCleanupClient
},
{
"server"
,
dnodeInitServer
,
dnodeCleanupServer
},
{
"mgmt"
,
dnodeInitMgmt
,
dnodeCleanupMgmt
},
...
...
@@ -104,7 +104,7 @@ static int32_t dnodeInitComponents() {
}
int32_t
dnodeInitSystem
()
{
dnodeSetRunStatus
(
TSDB_
DNODE_
RUN_STATUS_INITIALIZE
);
dnodeSetRunStatus
(
TSDB_RUN_STATUS_INITIALIZE
);
tscEmbedded
=
1
;
taosBlockSIGPIPE
();
taosResolveCRC
();
...
...
@@ -137,7 +137,7 @@ int32_t dnodeInitSystem() {
}
dnodeStartModules
();
dnodeSetRunStatus
(
TSDB_
DNODE_
RUN_STATUS_RUNING
);
dnodeSetRunStatus
(
TSDB_RUN_STATUS_RUNING
);
dInfo
(
"TDengine is initialized successfully"
);
...
...
@@ -145,20 +145,20 @@ int32_t dnodeInitSystem() {
}
void
dnodeCleanUpSystem
()
{
if
(
dnodeGetRunStatus
()
!=
TSDB_
DNODE_
RUN_STATUS_STOPPED
)
{
dnodeSetRunStatus
(
TSDB_
DNODE_
RUN_STATUS_STOPPED
);
if
(
dnodeGetRunStatus
()
!=
TSDB_RUN_STATUS_STOPPED
)
{
dnodeSetRunStatus
(
TSDB_RUN_STATUS_STOPPED
);
dnodeCleanupComponents
(
sizeof
(
tsDnodeComponents
)
/
sizeof
(
tsDnodeComponents
[
0
])
-
1
);
taos_cleanup
();
taosCloseLog
();
}
}
S
Dnode
RunStatus
dnodeGetRunStatus
()
{
return
ts
Dnode
RunStatus
;
SRunStatus
dnodeGetRunStatus
()
{
return
tsRunStatus
;
}
static
void
dnodeSetRunStatus
(
S
Dnode
RunStatus
status
)
{
ts
Dnode
RunStatus
=
status
;
static
void
dnodeSetRunStatus
(
SRunStatus
status
)
{
tsRunStatus
=
status
;
}
static
void
dnodeCheckDataDirOpenned
(
char
*
dir
)
{
...
...
src/dnode/src/dnodePeer.c
浏览文件 @
4bc9284a
...
...
@@ -34,8 +34,8 @@ static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static
void
dnodeProcessReqMsgFromDnode
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
);
static
void
(
*
dnodeProcessRspMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
rpcMsg
);
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
);
static
void
*
ts
Dnode
ServerRpc
=
NULL
;
static
void
*
ts
Dnode
ClientRpc
=
NULL
;
static
void
*
tsServerRpc
=
NULL
;
static
void
*
tsClientRpc
=
NULL
;
int32_t
dnodeInitServer
()
{
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
dnodeDispatchToVWriteQueue
;
...
...
@@ -50,11 +50,11 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeDispatchToMgmtQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_MNODE
]
=
dnodeDispatchToMgmtQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_TABLE
]
=
dnodeDispatchToM
node
PeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_VNODE
]
=
dnodeDispatchToM
node
PeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_AUTH
]
=
dnodeDispatchToM
node
PeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
dnodeDispatchToM
node
PeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
dnodeDispatchToM
node
PeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_TABLE
]
=
dnodeDispatchToMPeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_VNODE
]
=
dnodeDispatchToMPeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_AUTH
]
=
dnodeDispatchToMPeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
dnodeDispatchToMPeerQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
dnodeDispatchToMPeerQueue
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -66,8 +66,8 @@ int32_t dnodeInitServer() {
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
ts
Dnode
ServerRpc
=
rpcOpen
(
&
rpcInit
);
if
(
ts
Dnode
ServerRpc
==
NULL
)
{
tsServerRpc
=
rpcOpen
(
&
rpcInit
);
if
(
tsServerRpc
==
NULL
)
{
dError
(
"failed to init inter-dnodes RPC server"
);
return
-
1
;
}
...
...
@@ -77,9 +77,9 @@ int32_t dnodeInitServer() {
}
void
dnodeCleanupServer
()
{
if
(
ts
Dnode
ServerRpc
)
{
rpcClose
(
ts
Dnode
ServerRpc
);
ts
Dnode
ServerRpc
=
NULL
;
if
(
tsServerRpc
)
{
rpcClose
(
tsServerRpc
);
tsServerRpc
=
NULL
;
dInfo
(
"inter-dnodes RPC server is closed"
);
}
}
...
...
@@ -93,7 +93,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if
(
pMsg
->
pCont
==
NULL
)
return
;
if
(
dnodeGetRunStatus
()
!=
TSDB_
DNODE_
RUN_STATUS_RUNING
)
{
if
(
dnodeGetRunStatus
()
!=
TSDB_RUN_STATUS_RUNING
)
{
rspMsg
.
code
=
TSDB_CODE_APP_NOT_READY
;
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -131,8 +131,8 @@ int32_t dnodeInitClient() {
rpcInit
.
ckey
=
"key"
;
rpcInit
.
secret
=
secret
;
ts
Dnode
ClientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
ts
Dnode
ClientRpc
==
NULL
)
{
tsClientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
tsClientRpc
==
NULL
)
{
dError
(
"failed to init mnode rpc client"
);
return
-
1
;
}
...
...
@@ -142,9 +142,9 @@ int32_t dnodeInitClient() {
}
void
dnodeCleanupClient
()
{
if
(
ts
Dnode
ClientRpc
)
{
rpcClose
(
ts
Dnode
ClientRpc
);
ts
Dnode
ClientRpc
=
NULL
;
if
(
tsClientRpc
)
{
rpcClose
(
tsClientRpc
);
tsClientRpc
=
NULL
;
dInfo
(
"dnode inter-dnodes rpc client is closed"
);
}
}
...
...
@@ -168,15 +168,15 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
}
void
dnodeSendMsgToDnode
(
SRpcEpSet
*
epSet
,
SRpcMsg
*
rpcMsg
)
{
rpcSendRequest
(
ts
Dnode
ClientRpc
,
epSet
,
rpcMsg
);
rpcSendRequest
(
tsClientRpc
,
epSet
,
rpcMsg
);
}
void
dnodeSendMsgToMnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
)
{
SRpcEpSet
epSet
=
{
0
};
dnodeGetEpSetForPeer
(
&
epSet
);
rpcSendRecv
(
ts
Dnode
ClientRpc
,
&
epSet
,
rpcMsg
,
rpcRsp
);
rpcSendRecv
(
tsClientRpc
,
&
epSet
,
rpcMsg
,
rpcRsp
);
}
void
dnodeSendMsgToDnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
,
SRpcEpSet
*
epSet
)
{
rpcSendRecv
(
ts
Dnode
ClientRpc
,
epSet
,
rpcMsg
,
rpcRsp
);
rpcSendRecv
(
tsClientRpc
,
epSet
,
rpcMsg
,
rpcRsp
);
}
\ No newline at end of file
src/dnode/src/dnodeShell.c
浏览文件 @
4bc9284a
...
...
@@ -33,9 +33,9 @@
static
void
(
*
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
);
static
int
dnodeRetrieveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
*
ts
Dnode
ShellRpc
=
NULL
;
static
int32_t
ts
Dnode
QueryReqNum
=
0
;
static
int32_t
ts
Dnode
SubmitReqNum
=
0
;
static
void
*
tsShellRpc
=
NULL
;
static
int32_t
tsQueryReqNum
=
0
;
static
int32_t
tsSubmitReqNum
=
0
;
int32_t
dnodeInitShell
()
{
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeDispatchToVWriteQueue
;
...
...
@@ -44,35 +44,35 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
dnodeDispatchToVWriteQueue
;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_ACCT
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_ACCT
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_ACCT
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_USER
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_USER
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_USER
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_DNODE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_DNODE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_DB
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_DB
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_DB
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_TABLE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_TABLE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_TABLE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_STREAM
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_QUERY
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_STREAM
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_CONN
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONFIG_DNODE
]
=
dnodeDispatchToM
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_ACCT
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_ACCT
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_ACCT
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_USER
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_USER
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_USER
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_DNODE
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_DNODE
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_DB
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_DB
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_DB
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_TABLE
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_DROP_TABLE
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_TABLE
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_ALTER_STREAM
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_QUERY
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_STREAM
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_CONN
]
=
dnodeDispatchToMWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONFIG_DNODE
]
=
dnodeDispatchToMWriteQueue
;
// the following message shall be treated as mnode query
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_HEARTBEAT
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONNECT
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_USE_DB
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLE_META
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_STABLE_VGROUP
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLES_META
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_SHOW
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_RETRIEVE
]
=
dnodeDispatchToM
node
ReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_HEARTBEAT
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONNECT
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_USE_DB
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLE_META
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_STABLE_VGROUP
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLES_META
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_SHOW
]
=
dnodeDispatchToMReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_RETRIEVE
]
=
dnodeDispatchToMReadQueue
;
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
numOfThreads
=
(
int32_t
)
((
1
.
0
-
tsRatioOfQueryThreads
)
*
numOfThreads
/
2
.
0
);
...
...
@@ -91,8 +91,8 @@ int32_t dnodeInitShell() {
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
afp
=
dnodeRetrieveUserAuthInfo
;
ts
Dnode
ShellRpc
=
rpcOpen
(
&
rpcInit
);
if
(
ts
Dnode
ShellRpc
==
NULL
)
{
tsShellRpc
=
rpcOpen
(
&
rpcInit
);
if
(
tsShellRpc
==
NULL
)
{
dError
(
"failed to init shell rpc server"
);
return
-
1
;
}
...
...
@@ -102,13 +102,13 @@ int32_t dnodeInitShell() {
}
void
dnodeCleanupShell
()
{
if
(
ts
Dnode
ShellRpc
)
{
rpcClose
(
ts
Dnode
ShellRpc
);
ts
Dnode
ShellRpc
=
NULL
;
if
(
tsShellRpc
)
{
rpcClose
(
tsShellRpc
);
tsShellRpc
=
NULL
;
}
}
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
,
SRpcEpSet
*
pEpSet
)
{
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
...
...
@@ -117,7 +117,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if
(
pMsg
->
pCont
==
NULL
)
return
;
if
(
dnodeGetRunStatus
()
!=
TSDB_
DNODE_
RUN_STATUS_RUNING
)
{
if
(
dnodeGetRunStatus
()
!=
TSDB_RUN_STATUS_RUNING
)
{
dError
(
"RPC %p, shell msg:%s is ignored since dnode not running"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]);
rpcMsg
.
code
=
TSDB_CODE_APP_NOT_READY
;
rpcSendResponse
(
&
rpcMsg
);
...
...
@@ -126,9 +126,9 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_QUERY
)
{
atomic_fetch_add_32
(
&
ts
Dnode
QueryReqNum
,
1
);
atomic_fetch_add_32
(
&
tsQueryReqNum
,
1
);
}
else
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
)
{
atomic_fetch_add_32
(
&
ts
Dnode
SubmitReqNum
,
1
);
atomic_fetch_add_32
(
&
tsSubmitReqNum
,
1
);
}
else
{}
if
(
dnodeProcessShellMsgFp
[
pMsg
->
msgType
]
)
{
...
...
@@ -211,12 +211,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
}
}
S
Dnode
StatisInfo
dnodeGetStatisInfo
()
{
S
Dnode
StatisInfo
info
=
{
0
};
if
(
dnodeGetRunStatus
()
==
TSDB_
DNODE_
RUN_STATUS_RUNING
)
{
SStatisInfo
dnodeGetStatisInfo
()
{
SStatisInfo
info
=
{
0
};
if
(
dnodeGetRunStatus
()
==
TSDB_RUN_STATUS_RUNING
)
{
info
.
httpReqNum
=
httpGetReqCount
();
info
.
queryReqNum
=
atomic_exchange_32
(
&
ts
Dnode
QueryReqNum
,
0
);
info
.
submitReqNum
=
atomic_exchange_32
(
&
ts
Dnode
SubmitReqNum
,
0
);
info
.
queryReqNum
=
atomic_exchange_32
(
&
tsQueryReqNum
,
0
);
info
.
submitReqNum
=
atomic_exchange_32
(
&
tsSubmitReqNum
,
0
);
}
return
info
;
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
4bc9284a
...
...
@@ -113,7 +113,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
void
*
dnodeAllocVWriteQueue
(
void
*
pVnode
)
{
pthread_mutex_lock
(
&
tsVWriteWP
.
mutex
);
SVWriteWorker
*
pWorker
=
tsVWriteWP
.
worker
+
tsVWriteWP
.
nextId
;
void
*
queue
=
taosOpenQueue
();
taos_queue
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
{
pthread_mutex_unlock
(
&
tsVWriteWP
.
mutex
);
return
NULL
;
...
...
src/inc/dnode.h
浏览文件 @
4bc9284a
...
...
@@ -27,16 +27,16 @@ typedef struct {
int32_t
queryReqNum
;
int32_t
submitReqNum
;
int32_t
httpReqNum
;
}
S
Dnode
StatisInfo
;
}
SStatisInfo
;
typedef
enum
{
TSDB_
DNODE_
RUN_STATUS_INITIALIZE
,
TSDB_
DNODE_
RUN_STATUS_RUNING
,
TSDB_
DNODE_
RUN_STATUS_STOPPED
}
S
Dnode
RunStatus
;
TSDB_RUN_STATUS_INITIALIZE
,
TSDB_RUN_STATUS_RUNING
,
TSDB_RUN_STATUS_STOPPED
}
SRunStatus
;
S
DnodeRunStatus
dnodeGetRunStatus
();
S
Dnode
StatisInfo
dnodeGetStatisInfo
();
S
RunStatus
dnodeGetRunStatus
();
SStatisInfo
dnodeGetStatisInfo
();
bool
dnodeIsFirstDeploy
();
bool
dnodeIsMasterEp
(
char
*
ep
);
...
...
@@ -59,15 +59,15 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void
*
dnodeAllocVReadQueue
(
void
*
pVnode
);
void
dnodeFreeVReadQueue
(
void
*
rqueue
);
int32_t
dnodeAllocateM
nodePq
ueue
();
void
dnodeFreeM
nodePq
ueue
();
int32_t
dnodeAlloc
ateMnodeRq
ueue
();
void
dnodeFreeM
nodeRq
ueue
();
int32_t
dnodeAlloc
ateMnodeW
queue
();
void
dnodeFreeM
nodeW
queue
();
void
dnodeSendRpcM
node
WriteRsp
(
void
*
pMsg
,
int32_t
code
);
void
dnodeReprocessM
node
WriteMsg
(
void
*
pMsg
);
void
dnodeDelayReprocessM
node
WriteMsg
(
void
*
pMsg
);
int32_t
dnodeAllocateM
PeerQ
ueue
();
void
dnodeFreeM
PeerQ
ueue
();
int32_t
dnodeAlloc
MReadQ
ueue
();
void
dnodeFreeM
ReadQ
ueue
();
int32_t
dnodeAlloc
MWrite
queue
();
void
dnodeFreeM
Write
queue
();
void
dnodeSendRpcMWriteRsp
(
void
*
pMsg
,
int32_t
code
);
void
dnodeReprocessMWriteMsg
(
void
*
pMsg
);
void
dnodeDelayReprocessMWriteMsg
(
void
*
pMsg
);
void
dnodeSendStatusMsgToMnode
();
...
...
src/mnode/src/mnodeMain.c
浏览文件 @
4bc9284a
...
...
@@ -96,9 +96,9 @@ int32_t mnodeStartSystem() {
return
-
1
;
}
dnodeAlloc
ateMnodeW
queue
();
dnodeAlloc
ateMnodeRq
ueue
();
dnodeAllocateM
nodePq
ueue
();
dnodeAlloc
MWrite
queue
();
dnodeAlloc
MReadQ
ueue
();
dnodeAllocateM
PeerQ
ueue
();
if
(
mnodeInitComponents
()
!=
0
)
{
return
-
1
;
...
...
@@ -127,9 +127,9 @@ void mnodeCleanupSystem() {
mInfo
(
"starting to clean up mnode"
);
tsMgmtIsRunning
=
false
;
dnodeFreeM
nodeW
queue
();
dnodeFreeM
nodeRq
ueue
();
dnodeFreeM
nodePq
ueue
();
dnodeFreeM
Write
queue
();
dnodeFreeM
ReadQ
ueue
();
dnodeFreeM
PeerQ
ueue
();
mnodeCleanupTimer
();
mnodeCleanupComponents
(
sizeof
(
tsMnodeComponents
)
/
sizeof
(
tsMnodeComponents
[
0
])
-
1
);
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
4bc9284a
...
...
@@ -295,7 +295,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
if
(
pOper
->
writeCb
!=
NULL
)
{
pOper
->
retCode
=
(
*
pOper
->
writeCb
)(
pMsg
,
pOper
->
retCode
);
}
dnodeSendRpcM
node
WriteRsp
(
pMsg
,
pOper
->
retCode
);
dnodeSendRpcMWriteRsp
(
pMsg
,
pOper
->
retCode
);
// if ahandle, means this func is called by sdb write
if
(
ahandle
==
NULL
)
{
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
4bc9284a
...
...
@@ -1679,12 +1679,12 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
pTable
->
info
.
tableId
,
pMsg
->
rpcMsg
.
handle
);
pMsg
->
retry
=
0
;
dnodeReprocessM
node
WriteMsg
(
pMsg
);
dnodeReprocessMWriteMsg
(
pMsg
);
}
else
{
mDebug
(
"app:%p:%p, table:%s, created in dnode, thandle:%p"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pMsg
->
rpcMsg
.
handle
);
dnodeSendRpcM
node
WriteRsp
(
pMsg
,
TSDB_CODE_SUCCESS
);
dnodeSendRpcMWriteRsp
(
pMsg
,
TSDB_CODE_SUCCESS
);
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
else
{
...
...
@@ -2351,14 +2351,14 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
mError
(
"app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%"
PRIu64
", reason:%s"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
tid
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
));
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
return
;
}
if
(
mnodeMsg
->
pVgroup
==
NULL
)
mnodeMsg
->
pVgroup
=
mnodeGetVgroup
(
pTable
->
vgId
);
if
(
mnodeMsg
->
pVgroup
==
NULL
)
{
mError
(
"app:%p:%p, table:%s, failed to get vgroup"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
TSDB_CODE_MND_VGROUP_NOT_EXIST
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
TSDB_CODE_MND_VGROUP_NOT_EXIST
);
return
;
}
...
...
@@ -2368,7 +2368,7 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
mnodeDropVgroup
(
mnodeMsg
->
pVgroup
,
NULL
);
}
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
TSDB_CODE_SUCCESS
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
TSDB_CODE_SUCCESS
);
}
/*
...
...
@@ -2399,7 +2399,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeSendDropChildTableMsg
(
mnodeMsg
,
false
);
rpcMsg
->
code
=
TSDB_CODE_SUCCESS
;
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
return
;
}
...
...
@@ -2416,7 +2416,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mnodeMsg
->
pTable
=
NULL
;
mnodeDestroyChildTable
(
pTable
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
code
);
}
}
else
{
if
(
mnodeMsg
->
retry
++
<
10
)
{
...
...
@@ -2425,7 +2425,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
mnodeMsg
->
retry
,
pTable
->
vgId
,
pTable
->
tid
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
);
dnodeDelayReprocessM
node
WriteMsg
(
mnodeMsg
);
dnodeDelayReprocessMWriteMsg
(
mnodeMsg
);
}
else
{
mError
(
"app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%"
PRIu64
", result:%s thandle:%p"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
tid
,
pTable
->
uid
,
...
...
@@ -2434,7 +2434,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SSdbOper
oper
=
{.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
};
sdbDeleteRow
(
&
oper
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
}
}
}
...
...
@@ -2452,18 +2452,18 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
mDebug
(
"app:%p:%p, ctable:%s, altered in dnode, thandle:%p result:%s"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
mnodeMsg
->
rpcMsg
.
handle
,
tstrerror
(
rpcMsg
->
code
));
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
TSDB_CODE_SUCCESS
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
TSDB_CODE_SUCCESS
);
}
else
{
if
(
mnodeMsg
->
retry
++
<
3
)
{
mDebug
(
"app:%p:%p, table:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
mnodeMsg
->
retry
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
);
dnodeDelayReprocessM
node
WriteMsg
(
mnodeMsg
);
dnodeDelayReprocessMWriteMsg
(
mnodeMsg
);
}
else
{
mError
(
"app:%p:%p, table:%s, failed to alter in dnode, result:%s thandle:%p"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
}
}
}
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
4bc9284a
...
...
@@ -529,7 +529,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SSdbOper
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pVgroup
,
.
table
=
tsVgroupSdb
};
(
void
)
sdbUpdateRow
(
&
desc
);
dnodeReprocessM
node
WriteMsg
(
pMsg
);
dnodeReprocessMWriteMsg
(
pMsg
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
// if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) {
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
...
...
@@ -537,7 +537,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
// pVgroup->status = TAOS_VG_STATUS_READY;
// SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
// (void)sdbUpdateRow(&desc);
// dnodeReprocessM
node
WriteMsg(pMsg);
// dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
// } else {
// mError("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle,
...
...
@@ -969,7 +969,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mnodeMsg
->
pVgroup
=
NULL
;
mnodeDestroyVgroup
(
pVgroup
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
code
);
}
}
else
{
SSdbOper
oper
=
{
...
...
@@ -978,7 +978,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
.
pObj
=
pVgroup
};
sdbDeleteRow
(
&
oper
);
dnodeSendRpcM
node
WriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
}
}
...
...
@@ -1040,7 +1040,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
code
=
TSDB_CODE_MND_SDB_ERROR
;
}
dnodeReprocessM
node
WriteMsg
(
mnodeMsg
);
dnodeReprocessMWriteMsg
(
mnodeMsg
);
}
static
int32_t
mnodeProcessVnodeCfgMsg
(
SMnodeMsg
*
pMsg
)
{
...
...
src/plugins/monitor/src/monitorMain.c
浏览文件 @
4bc9284a
...
...
@@ -27,12 +27,12 @@
#include "monitor.h"
#include "taoserror.h"
#define m
onitor
Fatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define m
onitor
Error(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
#define m
onitor
Warn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
#define m
onitor
Info(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
#define m
onitor
Debug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define m
onitor
Trace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define m
n
Fatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define m
n
Error(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
#define m
n
Warn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
#define m
n
Info(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
#define m
n
Debug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define m
n
Trace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1030
#define LOG_LEN_STR 100
...
...
@@ -91,12 +91,12 @@ int32_t monitorInitSystem() {
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
tsMonitor
.
thread
,
&
thAttr
,
monitorThreadFunc
,
NULL
))
{
m
onitor
Error
(
"failed to create thread to for monitor module, reason:%s"
,
strerror
(
errno
));
m
n
Error
(
"failed to create thread to for monitor module, reason:%s"
,
strerror
(
errno
));
return
-
1
;
}
pthread_attr_destroy
(
&
thAttr
);
m
onitor
Debug
(
"monitor thread is launched"
);
m
n
Debug
(
"monitor thread is launched"
);
monitorStartSystemFp
=
monitorStartSystem
;
monitorStopSystemFp
=
monitorStopSystem
;
...
...
@@ -107,12 +107,12 @@ int32_t monitorStartSystem() {
taos_init
();
tsMonitor
.
start
=
1
;
monitorExecuteSQLFp
=
monitorExecuteSQL
;
m
onitor
Info
(
"monitor module start"
);
m
n
Info
(
"monitor module start"
);
return
0
;
}
static
void
*
monitorThreadFunc
(
void
*
param
)
{
m
onitor
Debug
(
"starting to initialize monitor module ..."
);
m
n
Debug
(
"starting to initialize monitor module ..."
);
while
(
1
)
{
static
int32_t
accessTimes
=
0
;
...
...
@@ -121,7 +121,7 @@ static void *monitorThreadFunc(void *param) {
if
(
tsMonitor
.
quiting
)
{
tsMonitor
.
state
=
MON_STATE_NOT_INIT
;
m
onitor
Info
(
"monitor thread will quit, for taosd is quiting"
);
m
n
Info
(
"monitor thread will quit, for taosd is quiting"
);
break
;
}
else
{
taosGetDisk
();
...
...
@@ -132,7 +132,7 @@ static void *monitorThreadFunc(void *param) {
}
if
(
dnodeGetDnodeId
()
<=
0
)
{
m
onitor
Debug
(
"dnode not initialized, waiting for 3000 ms to start monitor module"
);
m
n
Debug
(
"dnode not initialized, waiting for 3000 ms to start monitor module"
);
continue
;
}
...
...
@@ -140,10 +140,10 @@ static void *monitorThreadFunc(void *param) {
tsMonitor
.
state
=
MON_STATE_NOT_INIT
;
tsMonitor
.
conn
=
taos_connect
(
NULL
,
"monitor"
,
tsInternalPass
,
""
,
0
);
if
(
tsMonitor
.
conn
==
NULL
)
{
m
onitor
Error
(
"failed to connect to database, reason:%s"
,
tstrerror
(
terrno
));
m
n
Error
(
"failed to connect to database, reason:%s"
,
tstrerror
(
terrno
));
continue
;
}
else
{
m
onitor
Debug
(
"connect to database success"
);
m
n
Debug
(
"connect to database success"
);
}
}
...
...
@@ -155,10 +155,10 @@ static void *monitorThreadFunc(void *param) {
taos_free_result
(
res
);
if
(
code
!=
0
)
{
m
onitor
Error
(
"failed to exec sql:%s, reason:%s"
,
tsMonitor
.
sql
,
tstrerror
(
code
));
m
n
Error
(
"failed to exec sql:%s, reason:%s"
,
tsMonitor
.
sql
,
tstrerror
(
code
));
break
;
}
else
{
m
onitor
Debug
(
"successfully to exec sql:%s"
,
tsMonitor
.
sql
);
m
n
Debug
(
"successfully to exec sql:%s"
,
tsMonitor
.
sql
);
}
}
...
...
@@ -174,7 +174,7 @@ static void *monitorThreadFunc(void *param) {
}
}
m
onitor
Info
(
"monitor thread is stopped"
);
m
n
Info
(
"monitor thread is stopped"
);
return
NULL
;
}
...
...
@@ -238,7 +238,7 @@ void monitorStopSystem() {
tsMonitor
.
start
=
0
;
tsMonitor
.
state
=
MON_STATE_NOT_INIT
;
monitorExecuteSQLFp
=
NULL
;
m
onitor
Info
(
"monitor module stopped"
);
m
n
Info
(
"monitor module stopped"
);
}
void
monitorCleanUpSystem
()
{
...
...
@@ -249,7 +249,7 @@ void monitorCleanUpSystem() {
taos_close
(
tsMonitor
.
conn
);
tsMonitor
.
conn
=
NULL
;
}
m
onitor
Info
(
"monitor module is cleaned up"
);
m
n
Info
(
"monitor module is cleaned up"
);
}
// unit is MB
...
...
@@ -257,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) {
float
sysMemoryUsedMB
=
0
;
bool
suc
=
taosGetSysMemory
(
&
sysMemoryUsedMB
);
if
(
!
suc
)
{
m
onitor
Debug
(
"failed to get sys memory info"
);
m
n
Debug
(
"failed to get sys memory info"
);
}
float
procMemoryUsedMB
=
0
;
suc
=
taosGetProcMemory
(
&
procMemoryUsedMB
);
if
(
!
suc
)
{
m
onitor
Debug
(
"failed to get proc memory info"
);
m
n
Debug
(
"failed to get proc memory info"
);
}
return
sprintf
(
sql
,
", %f, %f, %d"
,
procMemoryUsedMB
,
sysMemoryUsedMB
,
tsTotalMemoryMB
);
...
...
@@ -274,7 +274,7 @@ static int32_t monitorBuildCpuSql(char *sql) {
float
sysCpuUsage
=
0
,
procCpuUsage
=
0
;
bool
suc
=
taosGetCpuUsage
(
&
sysCpuUsage
,
&
procCpuUsage
);
if
(
!
suc
)
{
m
onitor
Debug
(
"failed to get cpu usage"
);
m
n
Debug
(
"failed to get cpu usage"
);
}
if
(
sysCpuUsage
<=
procCpuUsage
)
{
...
...
@@ -294,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) {
float
bandSpeedKb
=
0
;
bool
suc
=
taosGetBandSpeed
(
&
bandSpeedKb
);
if
(
!
suc
)
{
m
onitor
Debug
(
"failed to get bandwidth speed"
);
m
n
Debug
(
"failed to get bandwidth speed"
);
}
return
sprintf
(
sql
,
", %f"
,
bandSpeedKb
);
}
static
int32_t
monitorBuildReqSql
(
char
*
sql
)
{
S
Dnode
StatisInfo
info
=
dnodeGetStatisInfo
();
SStatisInfo
info
=
dnodeGetStatisInfo
();
return
sprintf
(
sql
,
", %d, %d, %d)"
,
info
.
httpReqNum
,
info
.
queryReqNum
,
info
.
submitReqNum
);
}
...
...
@@ -309,7 +309,7 @@ static int32_t monitorBuildIoSql(char *sql) {
float
readKB
=
0
,
writeKB
=
0
;
bool
suc
=
taosGetProcIO
(
&
readKB
,
&
writeKB
);
if
(
!
suc
)
{
m
onitor
Debug
(
"failed to get io info"
);
m
n
Debug
(
"failed to get io info"
);
}
return
sprintf
(
sql
,
", %f, %f"
,
readKB
,
writeKB
);
...
...
@@ -332,19 +332,19 @@ static void monitorSaveSystemInfo() {
taos_free_result
(
res
);
if
(
code
!=
0
)
{
m
onitor
Error
(
"failed to save system info, reason:%s, sql:%s"
,
tstrerror
(
code
),
tsMonitor
.
sql
);
m
n
Error
(
"failed to save system info, reason:%s, sql:%s"
,
tstrerror
(
code
),
tsMonitor
.
sql
);
}
else
{
m
onitor
Debug
(
"successfully to save system info, sql:%s"
,
tsMonitor
.
sql
);
m
n
Debug
(
"successfully to save system info, sql:%s"
,
tsMonitor
.
sql
);
}
}
static
void
montiorExecSqlCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
int32_t
c
=
taos_errno
(
result
);
if
(
c
!=
TSDB_CODE_SUCCESS
)
{
m
onitor
Error
(
"save %s failed, reason:%s"
,
(
char
*
)
param
,
tstrerror
(
c
));
m
n
Error
(
"save %s failed, reason:%s"
,
(
char
*
)
param
,
tstrerror
(
c
));
}
else
{
int32_t
rows
=
taos_affected_rows
(
result
);
m
onitor
Debug
(
"save %s succ, rows:%d"
,
(
char
*
)
param
,
rows
);
m
n
Debug
(
"save %s succ, rows:%d"
,
(
char
*
)
param
,
rows
);
}
taos_free_result
(
result
);
...
...
@@ -380,7 +380,7 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
pMon
->
totalConns
,
pMon
->
maxConns
,
pMon
->
accessState
);
m
onitor
Debug
(
"save account info, sql:%s"
,
sql
);
m
n
Debug
(
"save account info, sql:%s"
,
sql
);
taos_query_a
(
tsMonitor
.
conn
,
sql
,
montiorExecSqlCb
,
"account info"
);
}
...
...
@@ -401,13 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
len
+=
sprintf
(
sql
+
len
,
"', '%s')"
,
tsLocalEp
);
sql
[
len
++
]
=
0
;
m
onitor
Debug
(
"save log, sql: %s"
,
sql
);
m
n
Debug
(
"save log, sql: %s"
,
sql
);
taos_query_a
(
tsMonitor
.
conn
,
sql
,
montiorExecSqlCb
,
"log"
);
}
void
monitorExecuteSQL
(
char
*
sql
)
{
if
(
tsMonitor
.
state
!=
MON_STATE_INITED
)
return
;
m
onitor
Debug
(
"execute sql:%s"
,
sql
);
m
n
Debug
(
"execute sql:%s"
,
sql
);
taos_query_a
(
tsMonitor
.
conn
,
sql
,
montiorExecSqlCb
,
"sql"
);
}
src/vnode/src/vnodeCfg.c
浏览文件 @
4bc9284a
...
...
@@ -22,7 +22,6 @@
#include "tsdb.h"
#include "dnode.h"
#include "vnodeInt.h"
#include "vnodeVersion.h"
#include "vnodeCfg.h"
static
void
vnodeLoadCfg
(
SVnodeObj
*
pVnode
,
SCreateVnodeMsg
*
vnodeMsg
)
{
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
4bc9284a
...
...
@@ -15,17 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tcache.h"
#include "cJSON.h"
#include "dnode.h"
#include "hash.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "trpc.h"
#include "tsdb.h"
#include "ttimer.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeInt.h"
...
...
@@ -34,7 +28,7 @@
#include "vnodeCfg.h"
#include "vnodeVersion.h"
static
SHashObj
*
ts
Dnode
VnodesHash
;
static
SHashObj
*
tsVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
...
...
@@ -67,8 +61,8 @@ int32_t vnodeInitResources() {
vnodeInitWriteFp
();
vnodeInitReadFp
();
ts
Dnode
VnodesHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
if
(
ts
Dnode
VnodesHash
==
NULL
)
{
tsVnodesHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
if
(
tsVnodesHash
==
NULL
)
{
vError
(
"failed to init vnode list"
);
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
}
...
...
@@ -77,10 +71,10 @@ int32_t vnodeInitResources() {
}
void
vnodeCleanupResources
()
{
if
(
ts
Dnode
VnodesHash
!=
NULL
)
{
if
(
tsVnodesHash
!=
NULL
)
{
vDebug
(
"vnode list is cleanup"
);
taosHashCleanup
(
ts
Dnode
VnodesHash
);
ts
Dnode
VnodesHash
=
NULL
;
taosHashCleanup
(
tsVnodesHash
);
tsVnodesHash
=
NULL
;
}
syncCleanUp
();
...
...
@@ -348,7 +342,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
taosHashPut
(
ts
Dnode
VnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pVnode
),
sizeof
(
SVnodeObj
*
));
taosHashPut
(
tsVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pVnode
),
sizeof
(
SVnodeObj
*
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -440,7 +434,7 @@ void vnodeRelease(void *pVnodeRaw) {
tsem_destroy
(
&
pVnode
->
sem
);
free
(
pVnode
);
int32_t
count
=
taosHashGetSize
(
ts
Dnode
VnodesHash
);
int32_t
count
=
taosHashGetSize
(
tsVnodesHash
);
vDebug
(
"vgId:%d, vnode is destroyed, vnodes:%d"
,
vgId
,
count
);
}
...
...
@@ -457,7 +451,7 @@ static void vnodeIncRef(void *ptNode) {
}
void
*
vnodeAcquire
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
taosHashGetCB
(
ts
Dnode
VnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
SVnodeObj
**
ppVnode
=
taosHashGetCB
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
...
...
@@ -496,7 +490,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
}
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
)
{
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
ts
Dnode
VnodesHash
);
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
tsVnodesHash
);
while
(
taosHashIterNext
(
pIter
))
{
SVnodeObj
**
pVnode
=
taosHashIterGet
(
pIter
);
if
(
pVnode
==
NULL
)
continue
;
...
...
@@ -517,7 +511,7 @@ int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void
vnodeBuildStatusMsg
(
void
*
param
)
{
SStatusMsg
*
pStatus
=
param
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
ts
Dnode
VnodesHash
);
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
tsVnodesHash
);
while
(
taosHashIterNext
(
pIter
))
{
SVnodeObj
**
pVnode
=
taosHashIterGet
(
pIter
);
...
...
@@ -546,7 +540,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
// remove from hash, so new messages wont be consumed
taosHashRemove
(
ts
Dnode
VnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
taosHashRemove
(
tsVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
// it may be in updateing or reset state, then it shall wait
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
4bc9284a
...
...
@@ -15,13 +15,10 @@
#define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
#include "trpc.h"
#include "tsdb.h"
...
...
@@ -29,9 +26,9 @@
#include "vnodeInt.h"
#include "tqueue.h"
static
int32_t
(
*
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
Msg
);
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
Msg
);
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
Msg
);
static
int32_t
(
*
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
);
void
vnodeInitReadFp
(
void
)
{
...
...
@@ -44,16 +41,16 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
//
int32_t
vnodeProcessRead
(
void
*
param
,
SVReadMsg
*
pRead
Msg
)
{
int32_t
vnodeProcessRead
(
void
*
param
,
SVReadMsg
*
pRead
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
int32_t
msgType
=
pRead
Msg
->
msgType
;
int32_t
msgType
=
pRead
->
msgType
;
if
(
vnodeProcessReadMsgFp
[
msgType
]
==
NULL
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, no handle"
,
pVnode
->
vgId
,
taosMsg
[
msgType
]);
return
TSDB_CODE_VND_MSG_NOT_PROCESSED
;
}
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pRead
Msg
);
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pRead
);
}
static
int32_t
vnodeCheckRead
(
void
*
param
)
{
...
...
@@ -180,27 +177,27 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp
->
completed
=
true
;
}
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
Msg
)
{
void
*
pCont
=
pRead
Msg
->
pCont
;
int32_t
contLen
=
pRead
Msg
->
contLen
;
SRspRet
*
pRet
=
&
pRead
Msg
->
rspRet
;
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
)
{
void
*
pCont
=
pRead
->
pCont
;
int32_t
contLen
=
pRead
->
contLen
;
SRspRet
*
pRet
=
&
pRead
->
rspRet
;
SQueryTableMsg
*
pQueryTableMsg
=
(
SQueryTableMsg
*
)
pCont
;
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
// qHandle needs to be freed correctly
if
(
pRead
Msg
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
SRetrieveTableMsg
*
killQueryMsg
=
(
SRetrieveTableMsg
*
)
pRead
Msg
->
pCont
;
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
SRetrieveTableMsg
*
killQueryMsg
=
(
SRetrieveTableMsg
*
)
pRead
->
pCont
;
killQueryMsg
->
free
=
htons
(
killQueryMsg
->
free
);
killQueryMsg
->
qhandle
=
htobe64
(
killQueryMsg
->
qhandle
);
vWarn
(
"QInfo:%p connection %p broken, kill query"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pRead
Msg
->
rpcHandle
);
assert
(
pRead
Msg
->
contLen
>
0
&&
killQueryMsg
->
free
==
1
);
vWarn
(
"QInfo:%p connection %p broken, kill query"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pRead
->
rpcHandle
);
assert
(
pRead
->
contLen
>
0
&&
killQueryMsg
->
free
==
1
);
void
**
qhandle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
(
uint64_t
)
killQueryMsg
->
qhandle
);
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
vWarn
(
"QInfo:%p invalid qhandle, no matched query handle, conn:%p"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pRead
Msg
->
rpcHandle
);
pRead
->
rpcHandle
);
}
else
{
assert
(
*
qhandle
==
(
void
*
)
killQueryMsg
->
qhandle
);
...
...
@@ -242,9 +239,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
if
(
handle
!=
NULL
&&
vnodeNotifyCurrentQhandle
(
pRead
Msg
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
*
handle
,
pRead
Msg
->
rpcHandle
);
pRead
->
rpcHandle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
return
pRsp
->
code
;
...
...
@@ -255,7 +252,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
if
(
handle
!=
NULL
)
{
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
*
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
pRead
Msg
->
rpcHandle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
pRead
->
rpcHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRsp
->
code
=
code
;
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
...
...
@@ -264,7 +261,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
}
else
{
assert
(
pCont
!=
NULL
);
void
**
qhandle
=
(
void
**
)
pRead
Msg
->
qhandle
;
void
**
qhandle
=
(
void
**
)
pRead
->
qhandle
;
vDebug
(
"vgId:%d, QInfo:%p, dnode continues to exec query"
,
pVnode
->
vgId
,
*
qhandle
);
...
...
@@ -276,14 +273,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
// build query rsp, the retrieve request has reached here already
if
(
buildRes
)
{
// update the connection info according to the retrieve connection
pRead
Msg
->
rpcHandle
=
qGetResultRetrieveMsg
(
*
qhandle
);
assert
(
pRead
Msg
->
rpcHandle
!=
NULL
);
pRead
->
rpcHandle
=
qGetResultRetrieveMsg
(
*
qhandle
);
assert
(
pRead
->
rpcHandle
!=
NULL
);
vDebug
(
"vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p"
,
pVnode
->
vgId
,
*
qhandle
,
pRead
Msg
->
rpcHandle
);
pRead
->
rpcHandle
);
// set the real rsp error code
pRead
Msg
->
code
=
vnodeDumpQueryResult
(
&
pRead
->
rspRet
,
pVnode
,
qhandle
,
&
freehandle
,
pReadMsg
->
rpcHandle
);
pRead
->
code
=
vnodeDumpQueryResult
(
&
pRead
->
rspRet
,
pVnode
,
qhandle
,
&
freehandle
,
pRead
->
rpcHandle
);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code
=
TSDB_CODE_QRY_HAS_RSP
;
...
...
@@ -308,16 +305,16 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
return
code
;
}
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
Msg
)
{
void
*
pCont
=
pRead
Msg
->
pCont
;
SRspRet
*
pRet
=
&
pRead
Msg
->
rspRet
;
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
)
{
void
*
pCont
=
pRead
->
pCont
;
SRspRet
*
pRet
=
&
pRead
->
rspRet
;
SRetrieveTableMsg
*
pRetrieve
=
pCont
;
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
pRetrieve
->
qhandle
=
htobe64
(
pRetrieve
->
qhandle
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
(
void
*
)
pRetrieve
->
qhandle
,
pRetrieve
->
free
,
pRead
Msg
->
rpcHandle
);
pRetrieve
->
free
,
pRead
->
rpcHandle
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
...
...
@@ -348,8 +345,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
// register the qhandle to connect to quit query immediate if connection is broken
if
(
vnodeNotifyCurrentQhandle
(
pRead
Msg
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p"
,
pVnode
->
vgId
,
*
handle
,
pRead
Msg
->
rpcHandle
);
if
(
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p"
,
pVnode
->
vgId
,
*
handle
,
pRead
->
rpcHandle
);
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
qKillQuery
(
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
...
...
@@ -359,7 +356,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
bool
freeHandle
=
true
;
bool
buildRes
=
false
;
code
=
qRetrieveQueryResultInfo
(
*
handle
,
&
buildRes
,
pRead
Msg
->
rpcHandle
);
code
=
qRetrieveQueryResultInfo
(
*
handle
,
&
buildRes
,
pRead
->
rpcHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO handle malloc failure
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
...
...
@@ -370,7 +367,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
assert
(
buildRes
==
true
);
#if _NON_BLOCKING_RETRIEVE
if
(
!
buildRes
)
{
assert
(
pRead
Msg
->
rpcHandle
!=
NULL
);
assert
(
pRead
->
rpcHandle
!=
NULL
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
return
TSDB_CODE_QRY_NOT_READY
;
...
...
@@ -378,7 +375,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
#endif
// ahandle is the sqlObj pointer
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
handle
,
&
freeHandle
,
pRead
Msg
->
rpcHandle
);
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
handle
,
&
freeHandle
,
pRead
->
rpcHandle
);
}
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
4bc9284a
...
...
@@ -19,7 +19,6 @@
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
#include "tutil.h"
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
...
...
@@ -185,7 +184,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static
int32_t
vnodeProcessDropStableMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
pRet
)
{
SDropSTableMsg
*
pTable
=
pCont
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
vDebug
(
"vgId:%d, stable:%s, start to drop"
,
pVnode
->
vgId
,
pTable
->
tableId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录