Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4264da2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c4264da2
编写于
5月 08, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: vnodes mgmt
上级
2ada10a1
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
202 addition
and
235 deletion
+202
-235
source/dnode/mgmt/interface/inc/dmDef.h
source/dnode/mgmt/interface/inc/dmDef.h
+2
-0
source/dnode/mgmt/interface/src/dmEnv.c
source/dnode/mgmt/interface/src/dmEnv.c
+1
-0
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+10
-9
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+16
-16
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+76
-52
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+16
-67
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+80
-90
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
未找到文件。
source/dnode/mgmt/interface/inc/dmDef.h
浏览文件 @
c4264da2
...
...
@@ -41,6 +41,8 @@
#include "monitor.h"
#include "sync.h"
#include "libs/function/function.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/dnode/mgmt/interface/src/dmEnv.c
浏览文件 @
c4264da2
...
...
@@ -55,6 +55,7 @@ void dmCleanup() {
monCleanup
();
syncCleanUp
();
walCleanUp
();
udfcClose
();
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
c4264da2
...
...
@@ -29,15 +29,15 @@ typedef struct SVnodesMgmt {
SHashObj
*
hash
;
SRWLatch
latch
;
SVnodesStat
state
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
STfs
*
pTfs
;
SQWorkerPool
queryPool
;
SQWorkerPool
fetchPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
writePool
;
SWWorkerPool
mergePool
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
SSingleWorker
mgmtWorker
;
SSingleWorker
monitorWorker
;
}
SVnodesMgmt
;
...
...
@@ -95,9 +95,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
);
// vmFile.c
int32_t
vmGetVnode
s
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
);
int32_t
vmWriteVnode
s
ToFile
(
SVnodesMgmt
*
pMgmt
);
SVnodeObj
**
vmGetVnode
s
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
);
int32_t
vmGetVnode
List
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
);
int32_t
vmWriteVnode
List
ToFile
(
SVnodesMgmt
*
pMgmt
);
SVnodeObj
**
vmGetVnode
List
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
);
// vmWorker.c
int32_t
vmStartWorker
(
SVnodesMgmt
*
pMgmt
);
...
...
@@ -105,11 +105,12 @@ void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t
vmAllocQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
void
vmFreeQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
);
// sync integration
int32_t
vmPutMsgToWriteQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
);
int32_t
vmPutMsgToWriteQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToQueryQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToFetchQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgTo
Apply
Queue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgTo
Merge
Queue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmGetQueueSize
(
SMgmtWrapper
*
pWrapper
,
int32_t
vgId
,
EQueueType
qtype
);
int32_t
vmProcessWriteMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
c4264da2
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
SVnodeObj
**
vmGetVnode
s
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
)
{
SVnodeObj
**
vmGetVnode
List
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
int32_t
num
=
0
;
...
...
@@ -44,14 +44,14 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
return
pVnodes
;
}
int32_t
vmGetVnode
s
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
)
{
int32_t
vmGetVnode
List
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
];
char
file
[
PATH_MAX
]
=
{
0
}
;
SWrapperCfg
*
pCfgs
=
NULL
;
TdFilePtr
pFile
=
NULL
;
...
...
@@ -61,26 +61,26 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
cJSON
*
vnodes
=
cJSON_GetObjectItem
(
root
,
"vnodes"
);
if
(
!
vnodes
||
vnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since vnodes not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
int32_t
vnodesNum
=
cJSON_GetArraySize
(
vnodes
);
...
...
@@ -88,7 +88,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
pCfgs
=
taosMemoryCalloc
(
vnodesNum
,
sizeof
(
SWrapperCfg
));
if
(
pCfgs
==
NULL
)
{
dError
(
"failed to read %s since out of memory"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
vnodesNum
;
++
i
)
{
...
...
@@ -98,7 +98,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
cJSON
*
vgId
=
cJSON_GetObjectItem
(
vnode
,
"vgId"
);
if
(
!
vgId
||
vgId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgId not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
vgId
=
vgId
->
valueint
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCfg
->
vgId
);
...
...
@@ -106,28 +106,28 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
cJSON
*
dropped
=
cJSON_GetObjectItem
(
vnode
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
dropped
=
dropped
->
valueint
;
cJSON
*
vgVersion
=
cJSON_GetObjectItem
(
vnode
,
"vgVersion"
);
if
(
!
vgVersion
||
vgVersion
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgVersion not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
vgVersion
=
vgVersion
->
valueint
;
cJSON
*
dbUid
=
cJSON_GetObjectItem
(
vnode
,
"dbUid"
);
if
(
!
dbUid
||
dbUid
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dbUid not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
dbUid
=
atoll
(
dbUid
->
valuestring
);
cJSON
*
db
=
cJSON_GetObjectItem
(
vnode
,
"db"
);
if
(
!
db
||
db
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since db not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
tstrncpy
(
pCfg
->
db
,
db
->
valuestring
,
TSDB_DB_FNAME_LEN
);
}
...
...
@@ -139,7 +139,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
code
=
0
;
dInfo
(
"succcessed to read file %s"
,
file
);
PRASE_VNODE
_OVER:
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
...
...
@@ -148,7 +148,7 @@ PRASE_VNODE_OVER:
return
code
;
}
int32_t
vmWriteVnode
s
ToFile
(
SVnodesMgmt
*
pMgmt
)
{
int32_t
vmWriteVnode
List
ToFile
(
SVnodesMgmt
*
pMgmt
)
{
char
file
[
PATH_MAX
];
char
realfile
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes.json.bak"
,
pMgmt
->
path
,
TD_DIRSEP
);
...
...
@@ -162,7 +162,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) {
}
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
vmGetVnode
s
FromHash
(
pMgmt
,
&
numOfVnodes
);
SVnodeObj
**
pVnodes
=
vmGetVnode
List
FromHash
(
pMgmt
,
&
numOfVnodes
);
int32_t
len
=
0
;
int32_t
maxLen
=
65536
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
c4264da2
...
...
@@ -16,12 +16,37 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
pInfo
->
pVloads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
if
(
pInfo
->
pVloads
==
NULL
)
return
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pInfo
->
pVloads
,
&
vload
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
void
vmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonVmInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SMonVloadInfo
vloads
=
{
0
};
vmGetVnodeLoads
(
pWrapper
,
&
vloads
);
if
(
vloads
.
pVloads
==
NULL
)
return
;
SArray
*
pVloads
=
vloads
.
pVloads
;
if
(
pVloads
==
NULL
)
return
;
int32_t
totalVnodes
=
0
;
int32_t
masterNum
=
0
;
...
...
@@ -31,8 +56,8 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
int64_t
numOfBatchInsertReqs
=
0
;
int64_t
numOfBatchInsertSuccessReqs
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
vloads
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
vloads
.
pVloads
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
pVloads
,
i
);
numOfSelectReqs
+=
pLoad
->
numOfSelectReqs
;
numOfInsertReqs
+=
pLoad
->
numOfInsertReqs
;
numOfInsertSuccessReqs
+=
pLoad
->
numOfInsertSuccessReqs
;
...
...
@@ -49,10 +74,16 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
pInfo
->
vstat
.
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
-
pMgmt
->
state
.
numOfInsertSuccessReqs
;
pInfo
->
vstat
.
numOfBatchInsertReqs
=
numOfBatchInsertReqs
-
pMgmt
->
state
.
numOfBatchInsertReqs
;
pInfo
->
vstat
.
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
-
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
;
pMgmt
->
state
=
pInfo
->
vstat
;
pMgmt
->
state
.
totalVnodes
=
totalVnodes
;
pMgmt
->
state
.
masterNum
=
masterNum
;
pMgmt
->
state
.
numOfSelectReqs
=
numOfSelectReqs
;
pMgmt
->
state
.
numOfInsertReqs
=
numOfInsertReqs
;
pMgmt
->
state
.
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
;
pMgmt
->
state
.
numOfBatchInsertReqs
=
numOfBatchInsertReqs
;
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
;
tfsGetMonitorInfo
(
pMgmt
->
pTfs
,
&
pInfo
->
tfs
);
taosArrayDestroy
(
vloads
.
pVloads
);
taosArrayDestroy
(
pVloads
);
}
int32_t
vmProcessGetMonVmInfoReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
)
{
...
...
@@ -107,12 +138,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
memcpy
(
pCfg
,
&
vnodeCfgDefault
,
sizeof
(
SVnodeCfg
));
pCfg
->
vgId
=
pCreate
->
vgId
;
strcpy
(
pCfg
->
dbname
,
pCreate
->
db
);
tstrncpy
(
pCfg
->
dbname
,
pCreate
->
db
,
sizeof
(
pCfg
->
dbname
));
pCfg
->
dbId
=
pCreate
->
dbUid
;
pCfg
->
isWeak
=
true
;
pCfg
->
tsdbCfg
.
days
=
10
;
pCfg
->
tsdbCfg
.
keep2
=
3650
;
pCfg
->
tsdbCfg
.
keep0
=
3650
;
pCfg
->
tsdbCfg
.
keep1
=
3650
;
pCfg
->
tsdbCfg
.
keep2
=
3650
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pCreate
->
pRetensions
);
++
i
)
{
memcpy
(
&
pCfg
->
tsdbCfg
.
retentions
[
i
],
taosArrayGet
(
pCreate
->
pRetensions
,
i
),
sizeof
(
SRetention
));
}
...
...
@@ -121,30 +153,30 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
pCfg
->
hashMethod
=
pCreate
->
hashMethod
;
// sync integration
pCfg
->
syncCfg
.
myIndex
=
pCreate
->
selfIndex
;
pCfg
->
syncCfg
.
replicaNum
=
pCreate
->
replica
;
memset
(
&
(
pCfg
->
syncCfg
.
nodeInfo
)
,
0
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
));
memset
(
&
pCfg
->
syncCfg
.
nodeInfo
,
0
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
));
for
(
int
i
=
0
;
i
<
pCreate
->
replica
;
++
i
)
{
(
pCfg
->
syncCfg
.
nodeInfo
)[
i
].
nodePort
=
(
pCreate
->
replicas
)
[
i
].
port
;
snprintf
(
(
pCfg
->
syncCfg
.
nodeInfo
)[
i
].
nodeFqdn
,
sizeof
((
pCfg
->
syncCfg
.
nodeInfo
)
[
i
].
nodeFqdn
),
"%s"
,
(
pCreate
->
replicas
)
[
i
].
fqdn
);
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodePort
=
pCreate
->
replicas
[
i
].
port
;
snprintf
(
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
pCreate
->
replicas
[
i
].
fqdn
);
}
}
static
void
vmGenerateWrapperCfg
(
SVnodesMgmt
*
pMgmt
,
SCreateVnodeReq
*
pCreate
,
SWrapperCfg
*
pCfg
)
{
memcpy
(
pCfg
->
db
,
pCreate
->
db
,
TSDB_DB_FNAME_LEN
);
pCfg
->
dbUid
=
pCreate
->
dbUid
;
pCfg
->
dropped
=
0
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCreate
->
vgId
);
pCfg
->
vgId
=
pCreate
->
vgId
;
pCfg
->
vgVersion
=
pCreate
->
vgVersion
;
pCfg
->
dropped
=
0
;
pCfg
->
dbUid
=
pCreate
->
dbUid
;
tstrncpy
(
pCfg
->
db
,
pCreate
->
db
,
TSDB_DB_FNAME_LEN
);
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCreate
->
vgId
);
}
int32_t
vmProcessCreateVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SCreateVnodeReq
createReq
=
{
0
};
char
path
[
TSDB_FILENAME_LEN
];
int32_t
code
=
-
1
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
...
...
@@ -161,14 +193,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
createReq
.
vgId
);
if
(
pVnode
!=
NULL
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dDebug
(
"vgId:%d, already exist"
,
createReq
.
vgId
);
tFreeSCreateVnodeReq
(
&
createReq
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
terrno
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
return
-
1
;
}
// create vnode
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vnodeCfg
.
vgId
);
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
pMgmt
->
pTfs
)
<
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
...
...
@@ -179,49 +210,43 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb
msgCb
=
pMgmt
->
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pMgmt
->
pWrapper
;
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
vmPutMsgToWriteQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
QUERY_QUEUE
]
=
vmPutMsgToQueryQueue
;
msgCb
.
queueFps
[
FETCH_QUEUE
]
=
vmPutMsgToFetchQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
// sync integration
msgCb
.
queueFps
[
MERGE_QUEUE
]
=
vmPutMsgToMergeQueue
;
msgCb
.
qsizeFp
=
vmGetQueueSize
;
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
tFreeSCreateVnodeReq
(
&
createReq
);
return
-
1
;
goto
_OVER
;
}
int32_t
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dError
(
"vgId:%d, failed to open vnode since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
goto
_OVER
;
}
code
=
vnodeStart
(
pImpl
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dError
(
"vgId:%d, failed to start sync since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
goto
_OVER
;
}
code
=
vmWriteVnodesToFile
(
pMgmt
);
code
=
vmWriteVnodeListToFile
(
pMgmt
);
if
(
code
!=
0
)
goto
_OVER
;
_OVER:
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
}
return
0
;
tFreeSCreateVnodeReq
(
&
createReq
);
terrno
=
code
;
return
code
;
}
int32_t
vmProcessDropVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
...
...
@@ -243,14 +268,14 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
pVnode
->
dropped
=
1
;
if
(
vmWriteVnode
s
ToFile
(
pMgmt
)
!=
0
)
{
if
(
vmWriteVnode
List
ToFile
(
pMgmt
)
!=
0
)
{
pVnode
->
dropped
=
0
;
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
vmCloseVnode
(
pMgmt
,
pVnode
);
vmWriteVnode
s
ToFile
(
pMgmt
);
vmWriteVnode
List
ToFile
(
pMgmt
);
return
0
;
}
...
...
@@ -287,7 +312,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SUBMIT_RSMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
@@ -300,14 +325,13 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_VNODE
,
vmProcessMgmtMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_VNODE
,
vmProcessMgmtMsg
,
DEFAULT_HANDLE
);
// sync integration
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_TIMEOUT
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_TIMEOUT
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
}
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
c4264da2
...
...
@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "libs/function/function.h"
SVnodeObj
*
vmAcquireVnode
(
SVnodesMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
...
...
@@ -55,14 +54,14 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode
->
vgId
=
pCfg
->
vgId
;
pVnode
->
refCount
=
0
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
dropped
=
0
;
pVnode
->
accessState
=
TSDB_VN_ALL_ACCCESS
;
pVnode
->
pWrapper
=
pMgmt
->
pWrapper
;
pVnode
->
pImpl
=
pImpl
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
dbUid
=
pCfg
->
dbUid
;
pVnode
->
db
=
tstrdup
(
pCfg
->
db
);
pVnode
->
path
=
tstrdup
(
pCfg
->
path
);
pVnode
->
pImpl
=
pImpl
;
pVnode
->
pWrapper
=
pMgmt
->
pWrapper
;
if
(
pVnode
->
path
==
NULL
||
pVnode
->
db
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -78,14 +77,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
int32_t
code
=
taosHashPut
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
&
pVnode
,
sizeof
(
SVnodeObj
*
));
taosWUnLockLatch
(
&
pMgmt
->
latch
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
return
code
;
}
void
vmCloseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
char
path
[
TSDB_FILENAME_LEN
];
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
}
;
taosWLockLatch
(
&
pMgmt
->
latch
);
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
...
...
@@ -98,6 +94,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
while
(
!
taosQueueEmpty
(
pVnode
->
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pQueryQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pFetchQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pMergeQ
))
taosMsleep
(
10
);
vmFreeQueue
(
pMgmt
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
...
...
@@ -116,7 +113,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
taosMemoryFree
(
pVnode
);
}
static
void
*
vmOpenVnode
Func
(
void
*
param
)
{
static
void
*
vmOpenVnode
InThread
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SVnodesMgmt
*
pMgmt
=
pThread
->
pMgmt
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
...
...
@@ -136,10 +133,11 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb
msgCb
=
pMgmt
->
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pMgmt
->
pWrapper
;
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
vmPutMsgToWriteQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
QUERY_QUEUE
]
=
vmPutMsgToQueryQueue
;
msgCb
.
queueFps
[
FETCH_QUEUE
]
=
vmPutMsgToFetchQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
// sync integration
msgCb
.
queueFps
[
MERGE_QUEUE
]
=
vmPutMsgToMergeQueue
;
msgCb
.
qsizeFp
=
vmGetQueueSize
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pCfg
->
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
msgCb
);
...
...
@@ -148,13 +146,11 @@ static void *vmOpenVnodeFunc(void *param) {
pThread
->
failed
++
;
}
else
{
vmOpenVnode
(
pMgmt
,
pCfg
,
pImpl
);
// vnodeStart(pImpl);
dDebug
(
"vgId:%d, is opened by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
opened
++
;
}
atomic_add_fetch_32
(
&
pMgmt
->
state
.
openVnodes
,
1
);
}
}
dDebug
(
"thread:%d, total vnodes:%d, opened:%d failed:%d"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
,
pThread
->
opened
,
pThread
->
failed
);
...
...
@@ -163,29 +159,24 @@ static void *vmOpenVnodeFunc(void *param) {
static
int32_t
vmOpenVnodes
(
SVnodesMgmt
*
pMgmt
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
pMgmt
->
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pMgmt
->
hash
==
NULL
)
{
dError
(
"failed to init vnode hash"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
dError
(
"failed to init vnode hash since %s"
,
terrstr
());
return
-
1
;
}
SWrapperCfg
*
pCfgs
=
NULL
;
int32_t
numOfVnodes
=
0
;
if
(
vmGetVnode
s
FromFile
(
pMgmt
,
&
pCfgs
,
&
numOfVnodes
)
!=
0
)
{
if
(
vmGetVnode
List
FromFile
(
pMgmt
,
&
pCfgs
,
&
numOfVnodes
)
!=
0
)
{
dInfo
(
"failed to get vnode list from disk since %s"
,
terrstr
());
return
-
1
;
}
pMgmt
->
state
.
totalVnodes
=
numOfVnodes
;
#if 0
int32_t threadNum = tsNumOfCores;
#else
int32_t
threadNum
=
1
;
#endif
int32_t
threadNum
=
1
;
// tsNumOfCores;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
taosMemoryCalloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
...
@@ -210,7 +201,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) {
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
pThread
->
thread
,
&
thAttr
,
vmOpenVnode
Func
,
pThread
)
!=
0
)
{
if
(
taosThreadCreate
(
&
pThread
->
thread
,
&
thAttr
,
vmOpenVnode
InThread
,
pThread
)
!=
0
)
{
dError
(
"thread:%d, failed to create thread to open vnode, reason:%s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
}
...
...
@@ -240,7 +231,7 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) {
dInfo
(
"start to close all vnodes"
);
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
vmGetVnode
s
FromHash
(
pMgmt
,
&
numOfVnodes
);
SVnodeObj
**
pVnodes
=
vmGetVnode
List
FromHash
(
pMgmt
,
&
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
vmCloseVnode
(
pMgmt
,
pVnodes
[
i
]);
...
...
@@ -267,12 +258,9 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
vmStopWorker
(
pMgmt
);
vnodeCleanup
();
tfsClose
(
pMgmt
->
pTfs
);
// walCleanUp();
taosMemoryFree
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
// syncCleanUp();
udfcClose
();
dInfo
(
"vnode-mgmt is cleaned up"
);
}
...
...
@@ -313,7 +301,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
}
dmReportStartup
(
pDnode
,
"vnode-wal"
,
"initialized"
);
// sync integration
if
(
syncInit
()
!=
0
)
{
dError
(
"failed to open sync since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -381,23 +368,7 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) {
}
static
void
vmStop
(
SMgmtWrapper
*
pWrapper
)
{
#if 0
dDebug("vnode-mgmt start to stop");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStop(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
#endif
// process inside the vnode
}
void
vmSetMgmtFp
(
SMgmtWrapper
*
pWrapper
)
{
...
...
@@ -413,25 +384,3 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper
->
fp
=
mgmtFp
;
}
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
pInfo
->
pVloads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
if
(
pInfo
->
pVloads
==
NULL
)
return
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pInfo
->
pVloads
,
&
vload
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
c4264da2
...
...
@@ -14,28 +14,29 @@
*/
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "sync.h"
#include "syncTools.h"
static
inline
void
vmSendRsp
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
rpcMsg
.
handle
,
static
inline
void
vmSendRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
};
.
contLen
=
pMsg
->
rspLen
,
};
tmsgSendRsp
(
&
rsp
);
}
static
void
vmProcessMgmtQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
static
void
vmProcessMgmt
Monitor
Queue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SVnodesMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
rpcMsg
.
msgType
;
dTrace
(
"msg:%p, will be processed in vnode-m queue"
,
pMsg
);
dTrace
(
"msg:%p, will be processed in vnode-m
gmt/monitor
queue"
,
pMsg
);
switch
(
msgType
)
{
case
TDMT_MON_VM_INFO
:
...
...
@@ -52,12 +53,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
break
;
default:
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
dError
(
"msg:%p, not processed in vnode-mgmt queue"
,
pMsg
);
dError
(
"msg:%p, not processed in vnode-mgmt
/monitor
queue"
,
pMsg
);
}
if
(
msgType
&
1u
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pM
gmt
->
pWrapper
,
pM
sg
,
code
);
vmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
...
...
@@ -71,7 +72,9 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace
(
"msg:%p, will be processed in vnode-query queue"
,
pMsg
);
int32_t
code
=
vnodeProcessQueryMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -84,7 +87,9 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace
(
"msg:%p, will be processed in vnode-fetch queue"
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
,
pInfo
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -108,32 +113,10 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace
(
"msg:%p, will be processed in vnode-write queue"
,
pMsg
);
if
(
taosArrayPush
(
pArray
,
&
pMsg
)
==
NULL
)
{
dTrace
(
"msg:%p, failed to process since %s"
,
pMsg
,
terrstr
());
vmSendRsp
(
p
Vnode
->
pWrapper
,
p
Msg
,
TSDB_CODE_OUT_OF_MEMORY
);
vmSendRsp
(
pMsg
,
TSDB_CODE_OUT_OF_MEMORY
);
}
}
#if 0
int64_t version;
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg;
rsp.pCont = NULL;
rsp.contLen = 0;
rsp.code = 0;
rsp.handle = pRpc->handle;
rsp.ahandle = pRpc->ahandle;
rsp.refId = pRpc->refId;
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp);
}
#else
// sync integration response
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
i
++
)
{
SNodeMsg
*
pMsg
;
SRpcMsg
*
pRpc
;
...
...
@@ -168,7 +151,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
assert
(
0
);
}
}
#endif
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SNodeMsg
*
pMsg
=
*
(
SNodeMsg
**
)
taosArrayGet
(
pArray
,
i
);
...
...
@@ -186,9 +168,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg
rsp
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
#if 1
// sync integration
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// init response rpc msg
...
...
@@ -219,7 +198,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp
.
refId
=
pMsg
->
rpcMsg
.
refId
;
tmsgSendRsp
(
&
rsp
);
}
#endif
}
}
...
...
@@ -246,7 +224,9 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace
(
"msg:%p, will be processed in vnode-merge queue"
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
,
pInfo
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -257,16 +237,17 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static
int32_t
vmPutNodeMsgToQueue
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
,
EQueueType
qtype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
int32_t
code
=
0
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
vgId
=
ntohl
(
pHead
->
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
dError
(
"vgId:%d, failed to write msg:%p to vnode-queue since %s"
,
pHead
->
vgId
,
pMsg
,
terrstr
());
return
terrno
;
return
terrno
!=
0
?
terrno
:
-
1
;
}
int32_t
code
=
0
;
switch
(
qtype
)
{
case
QUERY_QUEUE
:
dTrace
(
"msg:%p, type:%s will be written into vnode-query queue"
,
pMsg
,
TMSG_INFO
(
pRpc
->
msgType
));
...
...
@@ -326,7 +307,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t
vmProcessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
mgmtWorker
;
dTrace
(
"msg:%p, will be
written
to vnode-mgmt queue, worker:%s"
,
pMsg
,
pWorker
->
name
);
dTrace
(
"msg:%p, will be
put in
to vnode-mgmt queue, worker:%s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
}
...
...
@@ -335,7 +316,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p,
put into
worker:%s"
,
pMsg
,
pWorker
->
name
);
dTrace
(
"msg:%p,
will be put into vnode-monitor queue,
worker:%s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
}
...
...
@@ -350,9 +331,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
));
int32_t
code
=
0
;
if
(
pMsg
==
NULL
)
{
code
=
-
1
;
}
else
{
if
(
pMsg
!=
NULL
)
{
dTrace
(
"msg:%p, is created, type:%s"
,
pMsg
,
TMSG_INFO
(
pRpc
->
msgType
));
pMsg
->
rpcMsg
=
*
pRpc
;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
...
...
@@ -377,7 +356,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
dTrace
(
"msg:%p, will be put into vnode-merge queue"
,
pMsg
);
taosWriteQitem
(
pVnode
->
pMergeQ
,
pMsg
);
break
;
case
SYNC_QUEUE
:
// sync integration
case
SYNC_QUEUE
:
dTrace
(
"msg:%p, will be put into vnode-sync queue"
,
pMsg
);
taosWriteQitem
(
pVnode
->
pSyncQ
,
pMsg
);
break
;
...
...
@@ -387,6 +366,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
break
;
}
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
code
;
}
...
...
@@ -395,6 +375,14 @@ int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
WRITE_QUEUE
);
}
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
SYNC_QUEUE
);
}
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
APPLY_QUEUE
);
}
int32_t
vmPutMsgToQueryQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
QUERY_QUEUE
);
}
...
...
@@ -403,30 +391,15 @@ int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
FETCH_QUEUE
);
}
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
APPLY_QUEUE
);
}
int32_t
vmPutMsgToMergeQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
MERGE_QUEUE
);
}
// sync integration
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
SYNC_QUEUE
);
}
int32_t
vmGetQueueSize
(
SMgmtWrapper
*
pWrapper
,
int32_t
vgId
,
EQueueType
qtype
)
{
int32_t
size
=
-
1
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pWrapper
->
pMgmt
,
vgId
);
if
(
pVnode
!=
NULL
)
{
switch
(
qtype
)
{
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pQueryQ
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pFetchQ
);
break
;
case
WRITE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pWriteQ
);
break
;
...
...
@@ -436,6 +409,12 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
case
APPLY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pApplyQ
);
break
;
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pQueryQ
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pFetchQ
);
break
;
case
MERGE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pMergeQ
);
break
;
...
...
@@ -449,14 +428,14 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t
vmAllocQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessWriteQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessApplyQueue
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMergeQueue
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
p
FetchQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetch
Queue
);
pVnode
->
p
ApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessApply
Queue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pFetchQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetchQueue
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMergeQueue
);
if
(
pVnode
->
p
ApplyQ
==
NULL
||
pVnode
->
pWriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pFetch
Q
==
NULL
||
pVnode
->
p
Query
Q
==
NULL
||
pVnode
->
pMergeQ
==
NULL
)
{
if
(
pVnode
->
p
WriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pApplyQ
==
NULL
||
pVnode
->
pQuery
Q
==
NULL
||
pVnode
->
p
Fetch
Q
==
NULL
||
pVnode
->
pMergeQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -466,17 +445,17 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
}
void
vmFreeQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pWriteQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pApplyQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
mergePool
,
pVnode
->
pMergeQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
pVnode
->
pWriteQ
=
NULL
;
pVnode
->
pApplyQ
=
NULL
;
pVnode
->
pSyncQ
=
NULL
;
pVnode
->
p
Fetch
Q
=
NULL
;
pVnode
->
p
Apply
Q
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pFetchQ
=
NULL
;
pVnode
->
pMergeQ
=
NULL
;
dDebug
(
"vgId:%d, vnode queue is freed"
,
pVnode
->
vgId
);
}
...
...
@@ -499,17 +478,23 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool
->
max
=
tsNumOfVnodeWriteThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
pWPool
=
&
pMgmt
->
syncPool
;
pWPool
->
name
=
"vnode-sync"
;
pWPool
->
max
=
tsNumOfVnodeSyncThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
pWPool
=
&
pMgmt
->
mergePool
;
pWPool
->
name
=
"vnode-merge"
;
pWPool
->
max
=
tsNumOfVnodeMergeThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
SSingleWorkerCfg
cfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-mgmt"
,
.
fp
=
(
FItem
)
vmProcessMgmtQueue
,
.
param
=
pMgmt
};
SWWorkerPool
*
pSPool
=
&
pMgmt
->
syncPool
;
pSPool
->
name
=
"vnode-sync"
;
pSPool
->
max
=
tsNumOfVnodeSyncThreads
;
if
(
tWWorkerInit
(
pSPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pMPool
=
&
pMgmt
->
mergePool
;
pMPool
->
name
=
"vnode-merge"
;
pMPool
->
max
=
tsNumOfVnodeMergeThreads
;
if
(
tWWorkerInit
(
pMPool
)
!=
0
)
return
-
1
;
SSingleWorkerCfg
cfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-mgmt"
,
.
fp
=
(
FItem
)
vmProcessMgmtMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
mgmtWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start vnode-mgmt worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -517,7 +502,12 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-monitor"
,
.
fp
=
(
FItem
)
vmProcessMgmtQueue
,
.
param
=
pMgmt
};
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-monitor"
,
.
fp
=
(
FItem
)
vmProcessMgmtMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start mnode vnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -531,10 +521,10 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
void
vmStopWorker
(
SVnodesMgmt
*
pMgmt
)
{
tSingleWorkerCleanup
(
&
pMgmt
->
monitorWorker
);
tSingleWorkerCleanup
(
&
pMgmt
->
mgmtWorker
);
tQWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tWWorkerCleanup
(
&
pMgmt
->
writePool
);
tWWorkerCleanup
(
&
pMgmt
->
syncPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tWWorkerCleanup
(
&
pMgmt
->
mergePool
);
dDebug
(
"vnode workers are closed"
);
}
source/dnode/vnode/inc/vnode.h
浏览文件 @
c4264da2
...
...
@@ -148,7 +148,7 @@ struct STsdbCfg {
struct
SVnodeCfg
{
int32_t
vgId
;
char
dbname
[
TSDB_DB_NAME_LEN
];
char
dbname
[
TSDB_DB_
F
NAME_LEN
];
uint64_t
dbId
;
int32_t
szPage
;
int32_t
szCache
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录