Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bf2226f0
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bf2226f0
编写于
3月 16, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
332da703
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
1120 addition
and
1177 deletion
+1120
-1177
source/dnode/mgmt/mnode/src/mmInt.c
source/dnode/mgmt/mnode/src/mmInt.c
+3
-1
source/dnode/mgmt/mnode/src/mmWorker.c
source/dnode/mgmt/mnode/src/mmWorker.c
+1
-1
source/dnode/mgmt/vnode/inc/vmFile.h
source/dnode/mgmt/vnode/inc/vmFile.h
+32
-0
source/dnode/mgmt/vnode/inc/vmInt.h
source/dnode/mgmt/vnode/inc/vmInt.h
+60
-16
source/dnode/mgmt/vnode/inc/vmMgmt.h
source/dnode/mgmt/vnode/inc/vmMgmt.h
+0
-32
source/dnode/mgmt/vnode/inc/vmMsg.h
source/dnode/mgmt/vnode/inc/vmMsg.h
+6
-6
source/dnode/mgmt/vnode/inc/vmWorker.h
source/dnode/mgmt/vnode/inc/vmWorker.h
+3
-2
source/dnode/mgmt/vnode/src/vmFile.c
source/dnode/mgmt/vnode/src/vmFile.c
+207
-0
source/dnode/mgmt/vnode/src/vmInt.c
source/dnode/mgmt/vnode/src/vmInt.c
+362
-26
source/dnode/mgmt/vnode/src/vmMgmt.c
source/dnode/mgmt/vnode/src/vmMgmt.c
+0
-1083
source/dnode/mgmt/vnode/src/vmMsg.c
source/dnode/mgmt/vnode/src/vmMsg.c
+210
-6
source/dnode/mgmt/vnode/src/vmWorker.c
source/dnode/mgmt/vnode/src/vmWorker.c
+236
-4
未找到文件。
source/dnode/mgmt/mnode/src/mmInt.c
浏览文件 @
bf2226f0
...
...
@@ -224,6 +224,8 @@ static int32_t mmInit(SMgmtWrapper *pWrapper) {
SMnodeOpt
option
=
{
0
};
dInfo
(
"mnode-mgmt start to init"
);
if
(
pMgmt
==
NULL
)
goto
_OVER
;
pMgmt
->
path
=
pWrapper
->
path
;
pMgmt
->
pDnode
=
pWrapper
->
pDnode
;
pMgmt
->
pWrapper
=
pWrapper
;
...
...
@@ -249,7 +251,7 @@ _OVER:
pWrapper
->
pMgmt
=
pMgmt
;
dInfo
(
"mnode-mgmt is initialized"
);
}
else
{
dError
(
"failed to init mnode-mgmtsince %s"
,
terrstr
());
dError
(
"failed to init mnode-mgmt
since %s"
,
terrstr
());
mmCleanup
(
pWrapper
);
}
...
...
source/dnode/mgmt/mnode/src/mmWorker.c
浏览文件 @
bf2226f0
...
...
@@ -29,7 +29,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
if
(
pMnode
!=
NULL
)
{
pMsg
->
pNode
=
pMnode
;
code
=
mndProcessMsg
(
(
SNodeMsg
*
)
pMsg
);
code
=
mndProcessMsg
(
pMsg
);
mmRelease
(
pMgmt
,
pMnode
);
}
...
...
source/dnode/mgmt/vnode/inc/vmFile.h
浏览文件 @
bf2226f0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODES_FILE_H_
#define _TD_DND_VNODES_FILE_H_
#include "vmInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
vmGetVnodesFromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
);
int32_t
vmWriteVnodesToFile
(
SVnodesMgmt
*
pMgmt
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_VNODES_FILE_H_*/
\ No newline at end of file
source/dnode/mgmt/vnode/inc/vmInt.h
浏览文件 @
bf2226f0
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_INT_H_
#define _TD_DND_VNODE_INT_H_
#ifndef _TD_DND_VNODE
S
_INT_H_
#define _TD_DND_VNODE
S
_INT_H_
#include "dndInt.h"
...
...
@@ -22,7 +22,6 @@
extern
"C"
{
#endif
typedef
struct
{
int32_t
openVnodes
;
int32_t
totalVnodes
;
...
...
@@ -35,19 +34,64 @@ typedef struct {
}
SVnodesStat
;
typedef
struct
SVnodesMgmt
{
SVnodesStat
stat
;
SHashObj
*
hash
;
SRWLatch
latch
;
SQWorkerPool
queryPool
;
SFWorkerPool
fetchPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
writePool
;
STfs
*
pTfs
;
SProcObj
*
pProcess
;
bool
singleProc
;
SHashObj
*
hash
;
SRWLatch
latch
;
SVnodesStat
state
;
STfs
*
pTfs
;
SQWorkerPool
queryPool
;
SFWorkerPool
fetchPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
writePool
;
const
char
*
path
;
SMnode
*
pMnode
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
}
SVnodesMgmt
;
void
vmGetMgmtFp
(
SMgmtWrapper
*
pMgmt
)
;
typedef
struct
{
int32_t
vgId
;
int32_t
vgVersion
;
int8_t
dropped
;
uint64_t
dbUid
;
char
db
[
TSDB_DB_FNAME_LEN
];
char
path
[
PATH_MAX
+
20
];
}
SWrapperCfg
;
typedef
struct
{
int32_t
vgId
;
int32_t
refCount
;
int32_t
vgVersion
;
int8_t
dropped
;
int8_t
accessState
;
uint64_t
dbUid
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
STaosQueue
*
pWriteQ
;
STaosQueue
*
pSyncQ
;
STaosQueue
*
pApplyQ
;
STaosQueue
*
pQueryQ
;
STaosQueue
*
pFetchQ
;
}
SVnodeObj
;
typedef
struct
{
int32_t
vnodeNum
;
int32_t
opened
;
int32_t
failed
;
int32_t
threadIndex
;
pthread_t
thread
;
SVnodesMgmt
*
pMgmt
;
SWrapperCfg
*
pCfgs
;
}
SVnodeThread
;
// interface
void
vmGetMgmtFp
(
SMgmtWrapper
*
pWrapper
);
// vmInt.h
SVnodeObj
*
vmAcquireVnode
(
SVnodesMgmt
*
pMgmt
,
int32_t
vgId
);
void
vmReleaseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
vmOpenVnode
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
);
void
vmCloseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
dndInitVnodes
(
SDnode
*
pDnode
);
void
dndCleanupVnodes
(
SDnode
*
pDnode
);
...
...
@@ -65,10 +109,10 @@ int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t
vmProcessCompactVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmGetTfsMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonDiskInfo
*
pInfo
);
void
vmGetVndMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonDnodeInfo
*
pInfo
);
void
vmGetVndMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonDnodeInfo
*
pInfo
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_VNODE_INT_H_*/
\ No newline at end of file
#endif
/*_TD_DND_VNODES_INT_H_*/
\ No newline at end of file
source/dnode/mgmt/vnode/inc/vmMgmt.h
已删除
100644 → 0
浏览文件 @
332da703
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_MGMT_H_
#define _TD_DND_VNODE_MGMT_H_
#include "vmInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
dndPutReqToVQueryQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_VNODE_MGMT_H_*/
source/dnode/mgmt/vnode/inc/vmMsg.h
浏览文件 @
bf2226f0
...
...
@@ -23,12 +23,12 @@ extern "C" {
#endif
void
vmInitMsgHandles
(
SMgmtWrapper
*
pWrapper
);
int32_t
vmProcessCreateVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmProcessAlterVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmProcessDropVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
dndProcessAuthVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmProcessSyncVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmProcessCompactVnodeReq
(
S
Dnode
*
pDnode
,
SRpcMsg
*
pReq
);
int32_t
vmProcessCreateVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
int32_t
vmProcessAlterVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
int32_t
vmProcessDropVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
int32_t
dndProcessAuthVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
int32_t
vmProcessSyncVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
int32_t
vmProcessCompactVnodeReq
(
S
VnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/vnode/inc/vmWorker.h
浏览文件 @
bf2226f0
...
...
@@ -22,8 +22,9 @@
extern
"C"
{
#endif
int32_t
vmStartWorker
(
SDnode
*
pDnode
);
void
vmStopWorker
(
SDnode
*
pDnode
);
int32_t
vmStartWorker
(
SVnodesMgmt
*
pMgmt
);
void
vmStopWorker
(
SVnodesMgmt
*
pMgmt
);
void
vmInitMsgFp
(
SMnodeMgmt
*
pMgmt
);
void
vmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
vmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
...
...
source/dnode/mgmt/vnode/src/vmFile.c
浏览文件 @
bf2226f0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vmFile.h"
static
SVnodeObj
**
vmGetVnodesFromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
int32_t
num
=
0
;
int32_t
size
=
taosHashGetSize
(
pMgmt
->
hash
);
SVnodeObj
**
pVnodes
=
calloc
(
size
,
sizeof
(
SVnodeObj
*
));
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
if
(
pVnode
&&
num
<
size
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
pVnodes
[
num
]
=
(
*
ppVnode
);
num
++
;
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
else
{
taosHashCancelIterate
(
pMgmt
->
hash
,
pIter
);
}
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
*
numOfVnodes
=
num
;
return
pVnodes
;
}
int32_t
vmGetVnodesFromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
)
{
int32_t
code
=
TSDB_CODE_DND_VNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
];
SWrapperCfg
*
pCfgs
=
NULL
;
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes.json"
,
pMgmt
->
path
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_VNODE_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_VNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_VNODE_OVER
;
}
cJSON
*
vnodes
=
cJSON_GetObjectItem
(
root
,
"vnodes"
);
if
(
!
vnodes
||
vnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since vnodes not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
int32_t
vnodesNum
=
cJSON_GetArraySize
(
vnodes
);
if
(
vnodesNum
>
0
)
{
pCfgs
=
calloc
(
vnodesNum
,
sizeof
(
SWrapperCfg
));
if
(
pCfgs
==
NULL
)
{
dError
(
"failed to read %s since out of memory"
,
file
);
goto
PRASE_VNODE_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
vnodesNum
;
++
i
)
{
cJSON
*
vnode
=
cJSON_GetArrayItem
(
vnodes
,
i
);
SWrapperCfg
*
pCfg
=
&
pCfgs
[
i
];
cJSON
*
vgId
=
cJSON_GetObjectItem
(
vnode
,
"vgId"
);
if
(
!
vgId
||
vgId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgId not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
pCfg
->
vgId
=
vgId
->
valueint
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCfg
->
vgId
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
vnode
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
pCfg
->
dropped
=
dropped
->
valueint
;
cJSON
*
vgVersion
=
cJSON_GetObjectItem
(
vnode
,
"vgVersion"
);
if
(
!
vgVersion
||
vgVersion
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgVersion not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
pCfg
->
vgVersion
=
vgVersion
->
valueint
;
cJSON
*
dbUid
=
cJSON_GetObjectItem
(
vnode
,
"dbUid"
);
if
(
!
dbUid
||
dbUid
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dbUid not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
pCfg
->
dbUid
=
atoll
(
dbUid
->
valuestring
);
cJSON
*
db
=
cJSON_GetObjectItem
(
vnode
,
"db"
);
if
(
!
db
||
db
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since db not found"
,
file
);
goto
PRASE_VNODE_OVER
;
}
tstrncpy
(
pCfg
->
db
,
db
->
valuestring
,
TSDB_DB_FNAME_LEN
);
}
*
ppCfgs
=
pCfgs
;
}
*
numOfVnodes
=
vnodesNum
;
code
=
0
;
dInfo
(
"succcessed to read file %s"
,
file
);
PRASE_VNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
terrno
=
code
;
return
code
;
}
int32_t
vmWriteVnodesToFile
(
SVnodesMgmt
*
pMgmt
)
{
char
file
[
PATH_MAX
];
char
realfile
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes.json.bak"
,
pMgmt
->
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
file
),
"%s%svnodes.json"
,
pMgmt
->
path
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to write %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
vmGetVnodesFromHash
(
pMgmt
,
&
numOfVnodes
);
int32_t
len
=
0
;
int32_t
maxLen
=
65536
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
vnodes
\"
: [
\n
"
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
SVnodeObj
*
pVnode
=
pVnodes
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" {
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
vgId
\"
: %d,
\n
"
,
pVnode
->
vgId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
pVnode
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
vgVersion
\"
: %d,
\n
"
,
pVnode
->
vgVersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dbUid
\"
:
\"
%"
PRIu64
"
\"
,
\n
"
,
pVnode
->
dbUid
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
db
\"
:
\"
%s
\"\n
"
,
pVnode
->
db
);
if
(
i
<
numOfVnodes
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" ]
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
taosWriteFile
(
pFile
,
content
,
len
);
taosFsyncFile
(
pFile
);
taosCloseFile
(
&
pFile
);
free
(
content
);
terrno
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
SVnodeObj
*
pVnode
=
pVnodes
[
i
];
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
if
(
pVnodes
!=
NULL
)
{
free
(
pVnodes
);
}
dDebug
(
"successed to write %s"
,
realfile
);
return
taosRenameFile
(
file
,
realfile
);
}
\ No newline at end of file
source/dnode/mgmt/vnode/src/vmInt.c
浏览文件 @
bf2226f0
...
...
@@ -14,48 +14,316 @@
*/
#define _DEFAULT_SOURCE
#include "vm
Int
.h"
#include "vm
File
.h"
#include "vmMsg.h"
#include "vmMgmt.h"
#include "vmWorker.h"
SVnodeObj
*
vmAcquireVnode
(
SVnodesMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
taosHashGetDup
(
pMgmt
->
hash
,
&
vgId
,
sizeof
(
int32_t
),
(
void
*
)
&
pVnode
);
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
else
{
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pVnode
!=
NULL
)
{
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
return
pVnode
;
}
void
vmReleaseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
if
(
pVnode
==
NULL
)
return
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
int32_t
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dTrace
(
"vgId:%d, release vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
int32_t
vmOpenVnode
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
)
{
SVnodeObj
*
pVnode
=
calloc
(
1
,
sizeof
(
SVnodeObj
));
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pVnode
->
vgId
=
pCfg
->
vgId
;
pVnode
->
refCount
=
0
;
pVnode
->
dropped
=
0
;
pVnode
->
accessState
=
TSDB_VN_ALL_ACCCESS
;
pVnode
->
pImpl
=
pImpl
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
dbUid
=
pCfg
->
dbUid
;
pVnode
->
db
=
tstrdup
(
pCfg
->
db
);
pVnode
->
path
=
tstrdup
(
pCfg
->
path
);
if
(
pVnode
->
path
==
NULL
||
pVnode
->
db
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
dndAllocVnodeQueue
(
pDnode
,
pVnode
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
taosWLockLatch
(
&
pMgmt
->
latch
);
int32_t
code
=
taosHashPut
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
&
pVnode
,
sizeof
(
SVnodeObj
*
));
taosWUnLockLatch
(
&
pMgmt
->
latch
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
return
code
;
}
void
vmCloseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
taosWUnLockLatch
(
&
pMgmt
->
latch
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
while
(
pVnode
->
refCount
>
0
)
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pWriteQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pSyncQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pQueryQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pFetchQ
))
taosMsleep
(
10
);
dndFreeVnodeQueue
(
pDnode
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
pVnode
->
pImpl
=
NULL
;
dDebug
(
"vgId:%d, vnode is closed"
,
pVnode
->
vgId
);
if
(
pVnode
->
dropped
)
{
dDebug
(
"vgId:%d, vnode is destroyed for dropped:%d"
,
pVnode
->
vgId
,
pVnode
->
dropped
);
vnodeDestroy
(
pVnode
->
path
);
}
free
(
pVnode
->
path
);
free
(
pVnode
->
db
);
free
(
pVnode
);
}
static
void
*
vmOpenVnodeFunc
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SVnodesMgmt
*
pMgmt
=
pThread
->
pMgmt
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
dDebug
(
"thread:%d, start to open %d vnodes"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
setThreadName
(
"open-vnodes"
);
for
(
int32_t
v
=
0
;
v
<
pThread
->
vnodeNum
;
++
v
)
{
SWrapperCfg
*
pCfg
=
&
pThread
->
pCfgs
[
v
];
char
stepDesc
[
TSDB_STEP_DESC_LEN
]
=
{
0
};
snprintf
(
stepDesc
,
TSDB_STEP_DESC_LEN
,
"vgId:%d, start to restore, %d of %d have been opened"
,
pCfg
->
vgId
,
pMgmt
->
state
.
openVnodes
,
pMgmt
->
state
.
totalVnodes
);
dndReportStartup
(
pDnode
,
"open-vnodes"
,
stepDesc
);
SVnodeCfg
cfg
=
{.
pDnode
=
pDnode
,
.
pTfs
=
pMgmt
->
pTfs
,
.
vgId
=
pCfg
->
vgId
,
.
dbId
=
pCfg
->
dbUid
};
SVnode
*
pImpl
=
vnodeOpen
(
pCfg
->
path
,
&
cfg
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
}
else
{
vmOpenVnode
(
pDnode
,
pCfg
,
pImpl
);
dDebug
(
"vgId:%d, is opened by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
opened
++
;
}
atomic_add_fetch_32
(
&
pMgmt
->
state
.
openVnodes
,
1
);
}
dDebug
(
"thread:%d, total vnodes:%d, opened:%d failed:%d"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
,
pThread
->
opened
,
pThread
->
failed
);
return
NULL
;
}
static
int32_t
vmOpenVnodes
(
SVnodesMgmt
*
pMgmt
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
pMgmt
->
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pMgmt
->
hash
==
NULL
)
{
dError
(
"failed to init vnode hash"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SWrapperCfg
*
pCfgs
=
NULL
;
int32_t
numOfVnodes
=
0
;
if
(
vmGetVnodesFromFile
(
pDnode
,
&
pCfgs
,
&
numOfVnodes
)
!=
0
)
{
dInfo
(
"failed to get vnode list from disk since %s"
,
terrstr
());
return
-
1
;
}
pMgmt
->
state
.
totalVnodes
=
numOfVnodes
;
#if 0
int32_t threadNum = tsNumOfCores;
#else
int32_t
threadNum
=
1
;
#endif
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
threads
[
t
].
threadIndex
=
t
;
threads
[
t
].
pMgmt
=
pMgmt
;
threads
[
t
].
pCfgs
=
calloc
(
vnodesPerThread
,
sizeof
(
SWrapperCfg
));
}
for
(
int32_t
v
=
0
;
v
<
numOfVnodes
;
++
v
)
{
int32_t
t
=
v
%
threadNum
;
SVnodeThread
*
pThread
=
&
threads
[
t
];
pThread
->
pCfgs
[
pThread
->
vnodeNum
++
]
=
pCfgs
[
v
];
}
dInfo
(
"start %d threads to open %d vnodes"
,
threadNum
,
numOfVnodes
);
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
==
0
)
continue
;
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pThread
->
thread
,
&
thAttr
,
vmOpenVnodeFunc
,
pThread
)
!=
0
)
{
dError
(
"thread:%d, failed to create thread to open vnode, reason:%s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
}
pthread_attr_destroy
(
&
thAttr
);
}
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
>
0
&&
taosCheckPthreadValid
(
pThread
->
thread
))
{
pthread_join
(
pThread
->
thread
,
NULL
);
}
free
(
pThread
->
pCfgs
);
}
free
(
threads
);
free
(
pCfgs
);
if
(
pMgmt
->
state
.
openVnodes
!=
pMgmt
->
state
.
totalVnodes
)
{
dError
(
"there are total vnodes:%d, opened:%d"
,
pMgmt
->
state
.
totalVnodes
,
pMgmt
->
state
.
openVnodes
);
return
-
1
;
}
else
{
dInfo
(
"total vnodes:%d open successfully"
,
pMgmt
->
state
.
totalVnodes
);
return
0
;
}
}
static
void
vmCloseVnodes
(
SVnodesMgmt
*
pMgmt
)
{
dInfo
(
"start to close all vnodes"
);
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
dndGetVnodesFromHash
(
pDnode
,
&
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
vmCloseVnode
(
pDnode
,
pVnodes
[
i
]);
}
if
(
pVnodes
!=
NULL
)
{
free
(
pVnodes
);
}
if
(
pMgmt
->
hash
!=
NULL
)
{
taosHashCleanup
(
pMgmt
->
hash
);
pMgmt
->
hash
=
NULL
;
}
dInfo
(
"total vnodes:%d are all closed"
,
numOfVnodes
);
}
static
void
vmCleanup
(
SMgmtWrapper
*
pWrapper
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
;
dInfo
(
"vnodes-mgmt start to cleanup"
);
vmCloseVnodes
(
pMgmt
);
vmStopWorker
(
pMgmt
);
vnodeCleanup
();
// walCleanUp();
free
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
dInfo
(
"vnodes-mgmt is cleaned up"
);
}
static
int32_t
vmInit
(
SMgmtWrapper
*
pWrapper
)
{
// SDiskCfg dCfg = {0};
// tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
// dCfg.level = 0;
// dCfg.primary = 1;
// SDiskCfg *pDisks = pDnode->cfg.pDisks;
// int32_t numOfDisks = pDnode->cfg.numOfDisks;
// if (numOfDisks <= 0 || pDisks == NULL) {
// pDisks = &dCfg;
// numOfDisks = 1;
// }
// pDnode->pTfs = tfsOpen(pDisks, numOfDisks);
// if (pDnode->pTfs == NULL) {
// dError("failed to init tfs since %s", terrstr());
// return -1;
// }
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SVnodesMgmt
*
pMgmt
=
calloc
(
1
,
sizeof
(
SVnodesMgmt
));
int32_t
code
=
-
1
;
SVnodeOpt
vnodeOpt
=
{
0
};
dInfo
(
"vnodes-mgmt start to init"
);
if
(
pMgmt
==
NULL
)
goto
_OVER
;
pMgmt
->
path
=
pWrapper
->
path
;
pMgmt
->
pDnode
=
pWrapper
->
pDnode
;
pMgmt
->
pWrapper
=
pWrapper
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
SDiskCfg
dCfg
=
{
0
};
tstrncpy
(
dCfg
.
dir
,
pDnode
->
cfg
.
dataDir
,
TSDB_FILENAME_LEN
);
dCfg
.
level
=
0
;
dCfg
.
primary
=
1
;
SDiskCfg
*
pDisks
=
pDnode
->
cfg
.
pDisks
;
int32_t
numOfDisks
=
pDnode
->
cfg
.
numOfDisks
;
if
(
numOfDisks
<=
0
||
pDisks
==
NULL
)
{
pDisks
=
&
dCfg
;
numOfDisks
=
1
;
}
pMgmt
->
pTfs
=
tfsOpen
(
pDisks
,
numOfDisks
);
if
(
pMgmt
->
pTfs
==
NULL
)
{
dError
(
"failed to init tfs since %s"
,
terrstr
());
goto
_OVER
;
}
if
(
walInit
()
!=
0
)
{
dError
(
"failed to init wal since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
goto
_OVER
;
}
SVnodeOpt
vnodeOpt
=
{
0
};
vnodeOpt
.
nthreads
=
tsNumOfCommitThreads
;
vnodeOpt
.
putReqToVQueryQFp
=
dndPutReqToVQueryQ
;
vnodeOpt
.
sendReqToDnodeFp
=
dndSendReqToDnode
;
if
(
vnodeInit
(
&
vnodeOpt
)
!=
0
)
{
dError
(
"failed to init vnode since %s"
,
terrstr
());
dndCleanup
();
goto
_OVER
;
}
if
(
vmStartWorker
(
pMgmt
)
!=
0
)
{
dError
(
"failed to init workers since %s"
,
terrstr
())
goto
_OVER
;
}
if
(
vmOpenVnodes
(
pMgmt
)
!=
0
)
{
dError
(
"failed to open vnodes since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
_OVER:
if
(
code
==
0
)
{
pWrapper
->
pMgmt
=
pMgmt
;
dInfo
(
"vnodes-mgmt is initialized"
);
}
else
{
dError
(
"failed to init vnodes-mgmt since %s"
,
terrstr
());
vmCleanup
(
pWrapper
);
}
static
void
vmCleanup
(
SMgmtWrapper
*
pWrapper
)
{
vnodeCleanup
();
return
0
;
}
static
bool
vmRequire
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
...
...
@@ -70,3 +338,71 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper
->
name
=
"vnodes"
;
pWrapper
->
fp
=
mgmtFp
;
}
int32_t
vmGetTfsMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonDiskInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
-
1
;
return
tfsGetMonitorInfo
(
pMgmt
->
pTfs
,
pInfo
);
;
}
void
vmGetVndMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonDnodeInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
;
SVnodesStat
*
pStat
=
&
pMgmt
->
state
;
pInfo
->
req_select
=
pStat
->
numOfSelectReqs
;
pInfo
->
req_insert
=
pStat
->
numOfInsertReqs
;
pInfo
->
req_insert_success
=
pStat
->
numOfInsertSuccessReqs
;
pInfo
->
req_insert_batch
=
pStat
->
numOfBatchInsertReqs
;
pInfo
->
req_insert_batch_success
=
pStat
->
numOfBatchInsertSuccessReqs
;
pInfo
->
errors
=
tsNumOfErrorLogs
;
pInfo
->
vnodes_num
=
pStat
->
totalVnodes
;
pInfo
->
masters
=
pStat
->
masterNum
;
}
void
vmGetVnodeLoads
(
SDnode
*
pDnode
,
SArray
*
pLoads
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesStat
*
pStat
=
&
pMgmt
->
stat
;
int32_t
totalVnodes
=
0
;
int32_t
masterNum
=
0
;
int64_t
numOfSelectReqs
=
0
;
int64_t
numOfInsertReqs
=
0
;
int64_t
numOfInsertSuccessReqs
=
0
;
int64_t
numOfBatchInsertReqs
=
0
;
int64_t
numOfBatchInsertSuccessReqs
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pLoads
,
&
vload
);
numOfSelectReqs
+=
vload
.
numOfSelectReqs
;
numOfInsertReqs
+=
vload
.
numOfInsertReqs
;
numOfInsertSuccessReqs
+=
vload
.
numOfInsertSuccessReqs
;
numOfBatchInsertReqs
+=
vload
.
numOfBatchInsertReqs
;
numOfBatchInsertSuccessReqs
+=
vload
.
numOfBatchInsertSuccessReqs
;
totalVnodes
++
;
if
(
vload
.
role
==
TAOS_SYNC_STATE_LEADER
)
masterNum
++
;
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
pStat
->
totalVnodes
=
totalVnodes
;
pStat
->
masterNum
=
masterNum
;
pStat
->
numOfSelectReqs
=
numOfSelectReqs
;
pStat
->
numOfInsertReqs
=
numOfInsertReqs
;
pStat
->
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
;
pStat
->
numOfBatchInsertReqs
=
numOfBatchInsertReqs
;
pStat
->
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
;
}
\ No newline at end of file
source/dnode/mgmt/vnode/src/vmMgmt.c
已删除
100644 → 0
浏览文件 @
332da703
此差异已折叠。
点击以展开。
source/dnode/mgmt/vnode/src/vmMsg.c
浏览文件 @
bf2226f0
...
...
@@ -15,15 +15,219 @@
#define _DEFAULT_SOURCE
#include "vmMsg.h"
#include "vmFile.h"
#include "vmWorker.h"
static
void
vmGenerateVnodeCfg
(
SCreateVnodeReq
*
pCreate
,
SVnodeCfg
*
pCfg
)
{
pCfg
->
vgId
=
pCreate
->
vgId
;
pCfg
->
wsize
=
pCreate
->
cacheBlockSize
;
pCfg
->
ssize
=
pCreate
->
cacheBlockSize
;
pCfg
->
lsize
=
pCreate
->
cacheBlockSize
;
pCfg
->
isHeapAllocator
=
true
;
pCfg
->
ttl
=
4
;
pCfg
->
keep
=
pCreate
->
daysToKeep0
;
pCfg
->
streamMode
=
pCreate
->
streamMode
;
pCfg
->
isWeak
=
true
;
pCfg
->
tsdbCfg
.
keep
=
pCreate
->
daysToKeep0
;
pCfg
->
tsdbCfg
.
keep1
=
pCreate
->
daysToKeep2
;
pCfg
->
tsdbCfg
.
keep2
=
pCreate
->
daysToKeep0
;
pCfg
->
tsdbCfg
.
lruCacheSize
=
pCreate
->
cacheBlockSize
;
pCfg
->
metaCfg
.
lruSize
=
pCreate
->
cacheBlockSize
;
pCfg
->
walCfg
.
fsyncPeriod
=
pCreate
->
fsyncPeriod
;
pCfg
->
walCfg
.
level
=
pCreate
->
walLevel
;
pCfg
->
walCfg
.
retentionPeriod
=
10
;
pCfg
->
walCfg
.
retentionSize
=
128
;
pCfg
->
walCfg
.
rollPeriod
=
128
;
pCfg
->
walCfg
.
segSize
=
128
;
pCfg
->
walCfg
.
vgId
=
pCreate
->
vgId
;
pCfg
->
hashBegin
=
pCreate
->
hashBegin
;
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
pCfg
->
hashMethod
=
pCreate
->
hashMethod
;
}
static
void
vmGenerateWrapperCfg
(
SVnodesMgmt
*
pMgmt
,
SCreateVnodeReq
*
pCreate
,
SWrapperCfg
*
pCfg
)
{
memcpy
(
pCfg
->
db
,
pCreate
->
db
,
TSDB_DB_FNAME_LEN
);
pCfg
->
dbUid
=
pCreate
->
dbUid
;
pCfg
->
dropped
=
0
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCreate
->
vgId
);
pCfg
->
vgId
=
pCreate
->
vgId
;
pCfg
->
vgVersion
=
pCreate
->
vgVersion
;
}
int32_t
vmProcessCreateVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
)
{
SCreateVnodeReq
createReq
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
dDebug
(
"vgId:%d, create vnode req is received"
,
createReq
.
vgId
);
SVnodeCfg
vnodeCfg
=
{
0
};
vmGenerateVnodeCfg
(
&
createReq
,
&
vnodeCfg
);
SWrapperCfg
wrapperCfg
=
{
0
};
vmGenerateWrapperCfg
(
pMgmt
,
&
createReq
,
&
wrapperCfg
);
if
(
createReq
.
dnodeId
!=
dmGetDnodeId
(
pMgmt
->
pDnode
))
{
terrno
=
TSDB_CODE_DND_VNODE_INVALID_OPTION
;
dDebug
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
return
-
1
;
}
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
createReq
.
vgId
);
if
(
pVnode
!=
NULL
)
{
dDebug
(
"vgId:%d, already exist"
,
createReq
.
vgId
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
terrno
=
TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED
;
return
-
1
;
}
vnodeCfg
.
pDnode
=
pMgmt
->
pDnode
;
vnodeCfg
.
pTfs
=
pMgmt
->
pTfs
;
vnodeCfg
.
dbId
=
wrapperCfg
.
dbUid
;
SVnode
*
pImpl
=
vnodeOpen
(
wrapperCfg
.
path
,
&
vnodeCfg
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
return
-
1
;
}
int32_t
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
if
(
code
!=
0
)
{
dError
(
"vgId:%d, failed to open vnode since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
wrapperCfg
.
path
);
terrno
=
code
;
return
code
;
}
code
=
vmWriteVnodesToFile
(
pMgmt
);
if
(
code
!=
0
)
{
vnodeClose
(
pImpl
);
vnodeDestroy
(
wrapperCfg
.
path
);
terrno
=
code
;
return
code
;
}
return
0
;
}
int32_t
vmProcessAlterVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
)
{
SAlterVnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
dDebug
(
"vgId:%d, alter vnode req is received"
,
alterReq
.
vgId
);
SVnodeCfg
vnodeCfg
=
{
0
};
vmGenerateVnodeCfg
(
&
alterReq
,
&
vnodeCfg
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
alterReq
.
vgId
);
if
(
pVnode
==
NULL
)
{
dDebug
(
"vgId:%d, failed to alter vnode since %s"
,
alterReq
.
vgId
,
terrstr
());
return
-
1
;
}
if
(
alterReq
.
vgVersion
==
pVnode
->
vgVersion
)
{
vmReleaseVnode
(
pMgmt
,
pVnode
);
dDebug
(
"vgId:%d, no need to alter vnode cfg for version unchanged "
,
alterReq
.
vgId
);
return
0
;
}
if
(
vnodeAlter
(
pVnode
->
pImpl
,
&
vnodeCfg
)
!=
0
)
{
dError
(
"vgId:%d, failed to alter vnode since %s"
,
alterReq
.
vgId
,
terrstr
());
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
int32_t
oldVersion
=
pVnode
->
vgVersion
;
pVnode
->
vgVersion
=
alterReq
.
vgVersion
;
int32_t
code
=
vmWriteVnodesToFile
(
pMgmt
);
if
(
code
!=
0
)
{
pVnode
->
vgVersion
=
oldVersion
;
}
int32_t
vmProcessCreateVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
int32_t
vmProcessAlterVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
int32_t
vmProcessDropVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
int32_t
dndProcessAuthVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
int32_t
vmProcessSyncVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
int32_t
vmProcessCompactVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
code
;
}
int32_t
vmProcessDropVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
)
{
SDropVnodeReq
dropReq
=
{
0
};
if
(
tDeserializeSDropVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
int32_t
vgId
=
dropReq
.
vgId
;
dDebug
(
"vgId:%d, drop vnode req is received"
,
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
vgId
);
if
(
pVnode
==
NULL
)
{
dDebug
(
"vgId:%d, failed to drop since %s"
,
vgId
,
terrstr
());
terrno
=
TSDB_CODE_DND_VNODE_NOT_DEPLOYED
;
return
-
1
;
}
pVnode
->
dropped
=
1
;
if
(
vmWriteVnodesToFile
(
pMgmt
)
!=
0
)
{
pVnode
->
dropped
=
0
;
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
vmCloseVnode
(
pMgmt
,
pVnode
);
vmWriteVnodesToFile
(
pMgmt
);
return
0
;
}
int32_t
vmProcessSyncVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
)
{
SSyncVnodeReq
syncReq
=
{
0
};
tDeserializeSDropVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
syncReq
);
int32_t
vgId
=
syncReq
.
vgId
;
dDebug
(
"vgId:%d, sync vnode req is received"
,
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
vgId
);
if
(
pVnode
==
NULL
)
{
dDebug
(
"vgId:%d, failed to sync since %s"
,
vgId
,
terrstr
());
return
-
1
;
}
if
(
vnodeSync
(
pVnode
->
pImpl
)
!=
0
)
{
dError
(
"vgId:%d, failed to sync vnode since %s"
,
vgId
,
terrstr
());
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
0
;
}
int32_t
vmProcessCompactVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SRpcMsg
*
pReq
)
{
SCompactVnodeReq
compatcReq
=
{
0
};
tDeserializeSDropVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
compatcReq
);
int32_t
vgId
=
compatcReq
.
vgId
;
dDebug
(
"vgId:%d, compact vnode req is received"
,
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
vgId
);
if
(
pVnode
==
NULL
)
{
dDebug
(
"vgId:%d, failed to compact since %s"
,
vgId
,
terrstr
());
return
-
1
;
}
if
(
vnodeCompact
(
pVnode
->
pImpl
)
!=
0
)
{
dError
(
"vgId:%d, failed to compact vnode since %s"
,
vgId
,
terrstr
());
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
0
;
}
void
vmInitMsgHandles
(
SMgmtWrapper
*
pWrapper
)
{
// Requests handled by VNODE
...
...
source/dnode/mgmt/vnode/src/vmWorker.c
浏览文件 @
bf2226f0
...
...
@@ -16,7 +16,239 @@
#define _DEFAULT_SOURCE
#include "vmWorker.h"
int32_t
vmProcessWriteMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){
return
0
;}
int32_t
vmProcessSyncMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){
return
0
;}
int32_t
vmProcessQueryMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){
return
0
;}
int32_t
vmProcessFetchMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){
return
0
;}
\ No newline at end of file
static
void
dndProcessVnodeQueryQueue
(
SVnodeObj
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vnodeProcessQueryMsg
(
pVnode
->
pImpl
,
pMsg
);
}
static
void
dndProcessVnodeFetchQueue
(
SVnodeObj
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
);
}
static
void
dndProcessVnodeWriteQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
void
*
ptr
=
taosArrayPush
(
pArray
,
&
pMsg
);
assert
(
ptr
!=
NULL
);
}
vnodeProcessWMsgs
(
pVnode
->
pImpl
,
pArray
);
for
(
size_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SRpcMsg
*
pRsp
=
NULL
;
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
int32_t
code
=
vnodeApplyWMsg
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
if
(
pRsp
!=
NULL
)
{
pRsp
->
ahandle
=
pMsg
->
ahandle
;
rpcSendResponse
(
pRsp
);
free
(
pRsp
);
}
else
{
if
(
code
!=
0
)
code
=
terrno
;
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
}
}
for
(
size_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
taosArrayDestroy
(
pArray
);
}
static
void
dndProcessVnodeApplyQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// todo
SRpcMsg
*
pRsp
=
NULL
;
(
void
)
vnodeApplyWMsg
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
}
}
static
void
dndProcessVnodeSyncQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// todo
SRpcMsg
*
pRsp
=
NULL
;
(
void
)
vnodeProcessSyncReq
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
}
}
static
int32_t
dndWriteRpcMsgToVnodeQueue
(
STaosQueue
*
pQueue
,
SRpcMsg
*
pRpcMsg
,
bool
sendRsp
)
{
int32_t
code
=
0
;
if
(
pQueue
==
NULL
)
{
code
=
TSDB_CODE_MSG_NOT_PROCESSED
;
}
else
{
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
if
(
pMsg
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
*
pMsg
=
*
pRpcMsg
;
if
(
taosWriteQitem
(
pQueue
,
pMsg
)
!=
0
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
&&
sendRsp
)
{
if
(
pRpcMsg
->
msgType
&
1u
)
{
SRpcMsg
rsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
}
rpcFreeCont
(
pRpcMsg
->
pCont
);
}
return
code
;
}
static
SVnodeObj
*
dndAcquireVnodeFromMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
)
{
SMsgHead
*
pHead
=
pMsg
->
pCont
;
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
dError
(
"vgId:%d, failed to acquire vnode while process req"
,
pHead
->
vgId
);
if
(
pMsg
->
msgType
&
1u
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
TSDB_CODE_VND_INVALID_VGROUP_ID
};
rpcSendResponse
(
&
rsp
);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
return
pVnode
;
}
void
dndProcessVnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SVnodeObj
*
pVnode
=
dndAcquireVnodeFromMsg
(
pDnode
,
pMsg
);
if
(
pVnode
!=
NULL
)
{
(
void
)
dndWriteRpcMsgToVnodeQueue
(
pVnode
->
pWriteQ
,
pMsg
,
true
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
}
void
dndProcessVnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SVnodeObj
*
pVnode
=
dndAcquireVnodeFromMsg
(
pDnode
,
pMsg
);
if
(
pVnode
!=
NULL
)
{
(
void
)
dndWriteRpcMsgToVnodeQueue
(
pVnode
->
pSyncQ
,
pMsg
,
true
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
}
void
dndProcessVnodeQueryMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SVnodeObj
*
pVnode
=
dndAcquireVnodeFromMsg
(
pDnode
,
pMsg
);
if
(
pVnode
!=
NULL
)
{
(
void
)
dndWriteRpcMsgToVnodeQueue
(
pVnode
->
pQueryQ
,
pMsg
,
true
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
}
void
dndProcessVnodeFetchMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SVnodeObj
*
pVnode
=
dndAcquireVnodeFromMsg
(
pDnode
,
pMsg
);
if
(
pVnode
!=
NULL
)
{
(
void
)
dndWriteRpcMsgToVnodeQueue
(
pVnode
->
pFetchQ
,
pMsg
,
true
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
}
int32_t
dndPutReqToVQueryQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
)
{
SMsgHead
*
pHead
=
pMsg
->
pCont
;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
int32_t
code
=
dndWriteRpcMsgToVnodeQueue
(
pVnode
->
pQueryQ
,
pMsg
,
false
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
code
;
}
static
int32_t
dndPutMsgIntoVnodeApplyQueue
(
SDnode
*
pDnode
,
int32_t
vgId
,
SRpcMsg
*
pMsg
)
{
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
int32_t
code
=
taosWriteQitem
(
pVnode
->
pApplyQ
,
pMsg
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
code
;
}
static
int32_t
dndAllocVnodeQueue
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
dndProcessVnodeWriteQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
dndProcessVnodeApplyQueue
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
dndProcessVnodeSyncQueue
);
pVnode
->
pFetchQ
=
tFWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
dndProcessVnodeFetchQueue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
dndProcessVnodeQueryQueue
);
if
(
pVnode
->
pApplyQ
==
NULL
||
pVnode
->
pWriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pFetchQ
==
NULL
||
pVnode
->
pQueryQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
static
void
dndFreeVnodeQueue
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tFWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pWriteQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pApplyQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
pVnode
->
pWriteQ
=
NULL
;
pVnode
->
pApplyQ
=
NULL
;
pVnode
->
pSyncQ
=
NULL
;
pVnode
->
pFetchQ
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
}
int32_t
vmStartWorker
(
SVnodesMgmt
*
pMgmt
)
{
int32_t
maxFetchThreads
=
4
;
int32_t
minFetchThreads
=
TMIN
(
maxFetchThreads
,
tsNumOfCores
);
int32_t
minQueryThreads
=
TMAX
((
int32_t
)(
tsNumOfCores
*
tsRatioOfQueryCores
),
1
);
int32_t
maxQueryThreads
=
minQueryThreads
;
int32_t
maxWriteThreads
=
TMAX
(
tsNumOfCores
,
1
);
int32_t
maxSyncThreads
=
TMAX
(
tsNumOfCores
/
2
,
1
);
SQWorkerPool
*
pQPool
=
&
pMgmt
->
queryPool
;
pQPool
->
name
=
"vnode-query"
;
pQPool
->
min
=
minQueryThreads
;
pQPool
->
max
=
maxQueryThreads
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
SFWorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
pFPool
->
name
=
"vnode-fetch"
;
pFPool
->
min
=
minFetchThreads
;
pFPool
->
max
=
maxFetchThreads
;
if
(
tFWorkerInit
(
pFPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pWPool
=
&
pMgmt
->
writePool
;
pWPool
->
name
=
"vnode-write"
;
pWPool
->
max
=
maxWriteThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
pWPool
=
&
pMgmt
->
syncPool
;
pWPool
->
name
=
"vnode-sync"
;
pWPool
->
max
=
maxSyncThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
dDebug
(
"vnode workers is initialized"
);
return
0
;
}
void
vmStopWorker
(
SVnodesMgmt
*
pMgmt
)
{
tFWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tWWorkerCleanup
(
&
pMgmt
->
writePool
);
tWWorkerCleanup
(
&
pMgmt
->
syncPool
);
dDebug
(
"vnode workers is closed"
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录