Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7ce10a86
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7ce10a86
编写于
2月 16, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
serialize create vnode msg
上级
2b73e4f4
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
284 addition
and
200 deletion
+284
-200
source/common/src/tmsg.c
source/common/src/tmsg.c
+82
-1
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+50
-62
source/dnode/mgmt/impl/test/vnode/vnode.cpp
source/dnode/mgmt/impl/test/vnode/vnode.cpp
+96
-90
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+2
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+10
-8
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+44
-37
未找到文件。
source/common/src/tmsg.c
浏览文件 @
7ce10a86
...
...
@@ -2020,6 +2020,87 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
return
0
;
}
int32_t
tSerializeSCreateVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateVnodeReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
dbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
vgVersion
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
cacheBlockSize
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
totalBlocks
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
daysPerFile
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
daysToKeep0
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
daysToKeep1
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
daysToKeep2
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
minRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
compression
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
quorum
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
update
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
replica
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
selfIndex
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
i
];
if
(
tEncodeI32
(
&
encoder
,
pReplica
->
id
)
<
0
)
return
-
1
;
if
(
tEncodeU16
(
&
encoder
,
pReplica
->
port
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReplica
->
fqdn
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSCreateVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateVnodeReq
*
pReq
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
dbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgVersion
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
cacheBlockSize
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
totalBlocks
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
daysPerFile
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
daysToKeep0
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
daysToKeep1
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
daysToKeep2
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
minRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
compression
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
quorum
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
update
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
replica
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
selfIndex
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
i
];
if
(
tDecodeI32
(
&
decoder
,
&
pReplica
->
id
)
<
0
)
return
-
1
;
if
(
tDecodeU16
(
&
decoder
,
&
pReplica
->
port
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReplica
->
fqdn
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSDropVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDropVnodeReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
@@ -2043,7 +2124,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecode
I
64
(
&
decoder
,
&
pReq
->
dbUid
)
<
0
)
return
-
1
;
if
(
tDecode
U
64
(
&
decoder
,
&
pReq
->
dbUid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
7ce10a86
...
...
@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"
#include "dndMgmt.h"
#include "dndTransport.h"
typedef
struct
{
int32_t
vgId
;
...
...
@@ -34,9 +34,9 @@ typedef struct {
int8_t
dropped
;
int8_t
accessState
;
uint64_t
dbUid
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
STaosQueue
*
pWriteQ
;
STaosQueue
*
pSyncQ
;
STaosQueue
*
pApplyQ
;
...
...
@@ -50,7 +50,7 @@ typedef struct {
int32_t
failed
;
int32_t
threadIndex
;
pthread_t
thread
;
SDnode
*
pDnode
;
SDnode
*
pDnode
;
SWrapperCfg
*
pCfgs
;
}
SVnodeThread
;
...
...
@@ -68,7 +68,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
void
dndProcessVnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
dndPutMsgIntoVnodeApplyQueue
(
SDnode
*
pDnode
,
int32_t
vgId
,
SRpcMsg
*
pMsg
);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
);
static
void
dndReleaseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
);
static
int32_t
dndOpenVnode
(
SDnode
*
pDnode
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
);
static
void
dndCloseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
);
...
...
@@ -81,7 +81,7 @@ static void dndCloseVnodes(SDnode *pDnode);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -112,7 +112,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static
int32_t
dndOpenVnode
(
SDnode
*
pDnode
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
calloc
(
1
,
sizeof
(
SVnodeObj
));
SVnodeObj
*
pVnode
=
calloc
(
1
,
sizeof
(
SVnodeObj
));
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -189,7 +189,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
if
(
pVnode
&&
num
<
size
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
...
...
@@ -211,9 +211,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
int32_t
code
=
TSDB_CODE_DND_VNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
+
20
]
=
{
0
};
SWrapperCfg
*
pCfgs
=
NULL
;
...
...
@@ -254,7 +254,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
}
for
(
int32_t
i
=
0
;
i
<
vnodesNum
;
++
i
)
{
cJSON
*
vnode
=
cJSON_GetArrayItem
(
vnodes
,
i
);
cJSON
*
vnode
=
cJSON_GetArrayItem
(
vnodes
,
i
);
SWrapperCfg
*
pCfg
=
&
pCfgs
[
i
];
cJSON
*
vgId
=
cJSON_GetObjectItem
(
vnode
,
"vgId"
);
...
...
@@ -326,7 +326,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t
len
=
0
;
int32_t
maxLen
=
65536
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
vnodes
\"
: [
\n
"
);
...
...
@@ -368,8 +368,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static
void
*
dnodeOpenVnodeFunc
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SDnode
*
pDnode
=
pThread
->
pDnode
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SDnode
*
pDnode
=
pThread
->
pDnode
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
dDebug
(
"thread:%d, start to open %d vnodes"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
setThreadName
(
"open-vnodes"
);
...
...
@@ -383,7 +383,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dndReportStartup
(
pDnode
,
"open-vnodes"
,
stepDesc
);
SVnodeCfg
cfg
=
{.
pDnode
=
pDnode
,
.
pTfs
=
pDnode
->
pTfs
,
.
vgId
=
pCfg
->
vgId
,
.
dbId
=
pCfg
->
dbUid
};
SVnode
*
pImpl
=
vnodeOpen
(
pCfg
->
path
,
&
cfg
);
SVnode
*
pImpl
=
vnodeOpen
(
pCfg
->
path
,
&
cfg
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
...
...
@@ -499,31 +499,6 @@ static void dndCloseVnodes(SDnode *pDnode) {
dInfo
(
"total vnodes:%d are all closed"
,
numOfVnodes
);
}
static
SCreateVnodeReq
*
dndParseCreateVnodeReq
(
SRpcMsg
*
pReq
)
{
SCreateVnodeReq
*
pCreate
=
pReq
->
pCont
;
pCreate
->
vgId
=
htonl
(
pCreate
->
vgId
);
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
pCreate
->
dbUid
=
htobe64
(
pCreate
->
dbUid
);
pCreate
->
vgVersion
=
htonl
(
pCreate
->
vgVersion
);
pCreate
->
cacheBlockSize
=
htonl
(
pCreate
->
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pCreate
->
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pCreate
->
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pCreate
->
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pCreate
->
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pCreate
->
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pCreate
->
minRows
);
pCreate
->
maxRows
=
htonl
(
pCreate
->
maxRows
);
pCreate
->
commitTime
=
htonl
(
pCreate
->
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pCreate
->
fsyncPeriod
);
for
(
int
r
=
0
;
r
<
pCreate
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
pReplica
->
id
);
pReplica
->
port
=
htons
(
pReplica
->
port
);
}
return
pCreate
;
}
static
void
dndGenerateVnodeCfg
(
SCreateVnodeReq
*
pCreate
,
SVnodeCfg
*
pCfg
)
{
pCfg
->
vgId
=
pCreate
->
vgId
;
pCfg
->
wsize
=
pCreate
->
cacheBlockSize
;
...
...
@@ -557,24 +532,29 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
}
int32_t
dndProcessCreateVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SCreateVnodeReq
*
pCreate
=
dndParseCreateVnodeReq
(
pReq
);
dDebug
(
"vgId:%d, create vnode req is received"
,
pCreate
->
vgId
);
SCreateVnodeReq
createReq
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
dDebug
(
"vgId:%d, create vnode req is received"
,
createReq
.
vgId
);
SVnodeCfg
vnodeCfg
=
{
0
};
dndGenerateVnodeCfg
(
pCreate
,
&
vnodeCfg
);
dndGenerateVnodeCfg
(
&
createReq
,
&
vnodeCfg
);
SWrapperCfg
wrapperCfg
=
{
0
};
dndGenerateWrapperCfg
(
pDnode
,
pCreate
,
&
wrapperCfg
);
dndGenerateWrapperCfg
(
pDnode
,
&
createReq
,
&
wrapperCfg
);
if
(
pCreate
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
createReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_VNODE_INVALID_OPTION
;
dDebug
(
"vgId:%d, failed to create vnode since %s"
,
pCreate
->
vgId
,
terrstr
());
dDebug
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
return
-
1
;
}
SVnodeObj
*
pVnode
=
dndAcquireVnode
(
pDnode
,
pCreate
->
vgId
);
SVnodeObj
*
pVnode
=
dndAcquireVnode
(
pDnode
,
createReq
.
vgId
);
if
(
pVnode
!=
NULL
)
{
dDebug
(
"vgId:%d, already exist"
,
pCreate
->
vgId
);
dDebug
(
"vgId:%d, already exist"
,
createReq
.
vgId
);
dndReleaseVnode
(
pDnode
,
pVnode
);
terrno
=
TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED
;
return
-
1
;
...
...
@@ -585,13 +565,13 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
vnodeCfg
.
dbId
=
wrapperCfg
.
dbUid
;
SVnode
*
pImpl
=
vnodeOpen
(
wrapperCfg
.
path
,
&
vnodeCfg
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to create vnode since %s"
,
pCreate
->
vgId
,
terrstr
());
dError
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
return
-
1
;
}
int32_t
code
=
dndOpenVnode
(
pDnode
,
&
wrapperCfg
,
pImpl
);
if
(
code
!=
0
)
{
dError
(
"vgId:%d, failed to open vnode since %s"
,
pCreate
->
vgId
,
terrstr
());
dError
(
"vgId:%d, failed to open vnode since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
wrapperCfg
.
path
);
terrno
=
code
;
...
...
@@ -610,32 +590,37 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t
dndProcessAlterVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SAlterVnodeReq
*
pAlter
=
(
SAlterVnodeReq
*
)
dndParseCreateVnodeReq
(
pReq
);
dDebug
(
"vgId:%d, alter vnode req is received"
,
pAlter
->
vgId
);
SAlterVnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
dDebug
(
"vgId:%d, alter vnode req is received"
,
alterReq
.
vgId
);
SVnodeCfg
vnodeCfg
=
{
0
};
dndGenerateVnodeCfg
(
pAlter
,
&
vnodeCfg
);
dndGenerateVnodeCfg
(
&
alterReq
,
&
vnodeCfg
);
SVnodeObj
*
pVnode
=
dndAcquireVnode
(
pDnode
,
pAlter
->
vgId
);
SVnodeObj
*
pVnode
=
dndAcquireVnode
(
pDnode
,
alterReq
.
vgId
);
if
(
pVnode
==
NULL
)
{
dDebug
(
"vgId:%d, failed to alter vnode since %s"
,
pAlter
->
vgId
,
terrstr
());
dDebug
(
"vgId:%d, failed to alter vnode since %s"
,
alterReq
.
vgId
,
terrstr
());
return
-
1
;
}
if
(
pAlter
->
vgVersion
==
pVnode
->
vgVersion
)
{
if
(
alterReq
.
vgVersion
==
pVnode
->
vgVersion
)
{
dndReleaseVnode
(
pDnode
,
pVnode
);
dDebug
(
"vgId:%d, no need to alter vnode cfg for version unchanged "
,
pAlter
->
vgId
);
dDebug
(
"vgId:%d, no need to alter vnode cfg for version unchanged "
,
alterReq
.
vgId
);
return
0
;
}
if
(
vnodeAlter
(
pVnode
->
pImpl
,
&
vnodeCfg
)
!=
0
)
{
dError
(
"vgId:%d, failed to alter vnode since %s"
,
pAlter
->
vgId
,
terrstr
());
dError
(
"vgId:%d, failed to alter vnode since %s"
,
alterReq
.
vgId
,
terrstr
());
dndReleaseVnode
(
pDnode
,
pVnode
);
return
-
1
;
}
int32_t
oldVersion
=
pVnode
->
vgVersion
;
pVnode
->
vgVersion
=
pAlter
->
vgVersion
;
pVnode
->
vgVersion
=
alterReq
.
vgVersion
;
int32_t
code
=
dndWriteVnodesToFile
(
pDnode
);
if
(
code
!=
0
)
{
pVnode
->
vgVersion
=
oldVersion
;
...
...
@@ -647,7 +632,10 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t
dndProcessDropVnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDropVnodeReq
dropReq
=
{
0
};
tDeserializeSDropVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
);
if
(
tDeserializeSDropVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
int32_t
vgId
=
dropReq
.
vgId
;
dDebug
(
"vgId:%d, drop vnode req is received"
,
vgId
);
...
...
source/dnode/mgmt/impl/test/vnode/vnode.cpp
浏览文件 @
7ce10a86
...
...
@@ -27,38 +27,40 @@ Testbase DndTestVnode::test;
TEST_F
(
DndTestVnode
,
01
_Create_Vnode
)
{
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
int32_t
contLen
=
sizeof
(
SCreateVnodeReq
);
SCreateVnodeReq
*
pReq
=
(
SCreateVnodeReq
*
)
rpcMallocCont
(
contLen
);
pReq
->
vgId
=
htonl
(
2
);
pReq
->
dnodeId
=
htonl
(
1
);
strcpy
(
pReq
->
db
,
"1.d1"
);
pReq
->
dbUid
=
htobe64
(
9527
);
pReq
->
vgVersion
=
htonl
(
1
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRows
=
htonl
(
100
);
pReq
->
minRows
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replica
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
pReq
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
1
);
pReplica
->
port
=
htons
(
9527
);
SCreateVnodeReq
createReq
=
{
0
};
createReq
.
vgId
=
2
;
createReq
.
dnodeId
=
1
;
strcpy
(
createReq
.
db
,
"1.d1"
);
createReq
.
dbUid
=
9527
;
createReq
.
vgVersion
=
1
;
createReq
.
cacheBlockSize
=
16
;
createReq
.
totalBlocks
=
10
;
createReq
.
daysPerFile
=
10
;
createReq
.
daysToKeep0
=
3650
;
createReq
.
daysToKeep1
=
3650
;
createReq
.
daysToKeep2
=
3650
;
createReq
.
minRows
=
100
;
createReq
.
minRows
=
4096
;
createReq
.
commitTime
=
3600
;
createReq
.
fsyncPeriod
=
3000
;
createReq
.
walLevel
=
1
;
createReq
.
precision
=
0
;
createReq
.
compression
=
2
;
createReq
.
replica
=
1
;
createReq
.
quorum
=
1
;
createReq
.
update
=
0
;
createReq
.
cacheLastRow
=
0
;
createReq
.
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
createReq
.
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
createReq
.
replicas
[
r
];
pReplica
->
id
=
1
;
pReplica
->
port
=
9527
;
}
int32_t
contLen
=
tSerializeSCreateVnodeReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSCreateVnodeReq
(
pReq
,
contLen
,
&
createReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_DND_CREATE_VNODE
,
pReq
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
if
(
i
==
0
)
{
...
...
@@ -70,38 +72,40 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
}
{
int32_t
contLen
=
sizeof
(
SCreateVnodeReq
);
SCreateVnodeReq
*
pReq
=
(
SCreateVnodeReq
*
)
rpcMallocCont
(
contLen
);
pReq
->
vgId
=
htonl
(
2
);
pReq
->
dnodeId
=
htonl
(
3
);
strcpy
(
pReq
->
db
,
"1.d1"
);
pReq
->
dbUid
=
htobe64
(
9527
);
pReq
->
vgVersion
=
htonl
(
1
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRows
=
htonl
(
100
);
pReq
->
minRows
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replica
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
pReq
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
1
);
pReplica
->
port
=
htons
(
9527
);
SCreateVnodeReq
createReq
=
{
0
};
createReq
.
vgId
=
2
;
createReq
.
dnodeId
=
3
;
strcpy
(
createReq
.
db
,
"1.d1"
);
createReq
.
dbUid
=
9527
;
createReq
.
vgVersion
=
1
;
createReq
.
cacheBlockSize
=
16
;
createReq
.
totalBlocks
=
10
;
createReq
.
daysPerFile
=
10
;
createReq
.
daysToKeep0
=
3650
;
createReq
.
daysToKeep1
=
3650
;
createReq
.
daysToKeep2
=
3650
;
createReq
.
minRows
=
100
;
createReq
.
minRows
=
4096
;
createReq
.
commitTime
=
3600
;
createReq
.
fsyncPeriod
=
3000
;
createReq
.
walLevel
=
1
;
createReq
.
precision
=
0
;
createReq
.
compression
=
2
;
createReq
.
replica
=
1
;
createReq
.
quorum
=
1
;
createReq
.
update
=
0
;
createReq
.
cacheLastRow
=
0
;
createReq
.
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
createReq
.
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
createReq
.
replicas
[
r
];
pReplica
->
id
=
1
;
pReplica
->
port
=
9527
;
}
int32_t
contLen
=
tSerializeSCreateVnodeReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSCreateVnodeReq
(
pReq
,
contLen
,
&
createReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_DND_CREATE_VNODE
,
pReq
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
ASSERT_EQ
(
pRsp
->
code
,
TSDB_CODE_DND_VNODE_INVALID_OPTION
);
...
...
@@ -110,38 +114,40 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
TEST_F
(
DndTestVnode
,
02
_Alter_Vnode
)
{
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
int32_t
contLen
=
sizeof
(
SAlterVnodeReq
);
SAlterVnodeReq
*
pReq
=
(
SAlterVnodeReq
*
)
rpcMallocCont
(
contLen
);
pReq
->
vgId
=
htonl
(
2
);
pReq
->
dnodeId
=
htonl
(
1
);
strcpy
(
pReq
->
db
,
"1.d1"
);
pReq
->
dbUid
=
htobe64
(
9527
);
pReq
->
vgVersion
=
htonl
(
2
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRows
=
htonl
(
100
);
pReq
->
minRows
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replica
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
pReq
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
1
);
pReplica
->
port
=
htons
(
9527
);
SAlterVnodeReq
alterReq
=
{
0
};
alterReq
.
vgId
=
2
;
alterReq
.
dnodeId
=
1
;
strcpy
(
alterReq
.
db
,
"1.d1"
);
alterReq
.
dbUid
=
9527
;
alterReq
.
vgVersion
=
2
;
alterReq
.
cacheBlockSize
=
16
;
alterReq
.
totalBlocks
=
10
;
alterReq
.
daysPerFile
=
10
;
alterReq
.
daysToKeep0
=
3650
;
alterReq
.
daysToKeep1
=
3650
;
alterReq
.
daysToKeep2
=
3650
;
alterReq
.
minRows
=
100
;
alterReq
.
minRows
=
4096
;
alterReq
.
commitTime
=
3600
;
alterReq
.
fsyncPeriod
=
3000
;
alterReq
.
walLevel
=
1
;
alterReq
.
precision
=
0
;
alterReq
.
compression
=
2
;
alterReq
.
replica
=
1
;
alterReq
.
quorum
=
1
;
alterReq
.
update
=
0
;
alterReq
.
cacheLastRow
=
0
;
alterReq
.
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
alterReq
.
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
alterReq
.
replicas
[
r
];
pReplica
->
id
=
1
;
pReplica
->
port
=
9527
;
}
int32_t
contLen
=
tSerializeSCreateVnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSCreateVnodeReq
(
pReq
,
contLen
,
&
alterReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_DND_ALTER_VNODE
,
pReq
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
7ce10a86
...
...
@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
SCreateVnodeReq
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
SDropVnodeReq
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
7ce10a86
...
...
@@ -335,11 +335,12 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SCreateVnodeReq
*
pReq
=
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
action
.
contLen
=
sizeof
(
SCreateVnodeReq
)
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_CREATE_VNODE
;
action
.
acceptableCode
=
TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
...
...
@@ -365,8 +366,8 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
int32_t
contLen
=
0
;
SDropVnodeReq
*
pReq
=
mndBuildDropVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildDropVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
...
...
@@ -578,11 +579,12 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SAlterVnodeReq
*
pReq
=
(
SAlterVnodeReq
*
)
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
action
.
contLen
=
sizeof
(
SAlterVnodeReq
)
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_ALTER_VNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pReq
);
...
...
@@ -755,8 +757,8 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
int32_t
contLen
=
0
;
SDropVnodeReq
*
pReq
=
mndBuildDropVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildDropVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
7ce10a86
...
...
@@ -189,43 +189,37 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease
(
pSdb
,
pVgroup
);
}
SCreateVnodeReq
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SCreateVnodeReq
*
pCreate
=
calloc
(
1
,
sizeof
(
SCreateVnodeReq
));
if
(
pCreate
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pCreate
->
vgId
=
htonl
(
pVgroup
->
vgId
);
pCreate
->
dnodeId
=
htonl
(
pDnode
->
id
);
memcpy
(
pCreate
->
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
pCreate
->
dbUid
=
htobe64
(
pDb
->
uid
);
pCreate
->
vgVersion
=
htonl
(
pVgroup
->
version
);
pCreate
->
cacheBlockSize
=
htonl
(
pDb
->
cfg
.
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pDb
->
cfg
.
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pDb
->
cfg
.
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pDb
->
cfg
.
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pDb
->
cfg
.
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pDb
->
cfg
.
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pDb
->
cfg
.
minRows
);
pCreate
->
maxRows
=
htonl
(
pDb
->
cfg
.
maxRows
);
pCreate
->
commitTime
=
htonl
(
pDb
->
cfg
.
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pDb
->
cfg
.
fsyncPeriod
);
pCreate
->
walLevel
=
pDb
->
cfg
.
walLevel
;
pCreate
->
precision
=
pDb
->
cfg
.
precision
;
pCreate
->
compression
=
pDb
->
cfg
.
compression
;
pCreate
->
quorum
=
pDb
->
cfg
.
quorum
;
pCreate
->
update
=
pDb
->
cfg
.
update
;
pCreate
->
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
pCreate
->
replica
=
pVgroup
->
replica
;
pCreate
->
selfIndex
=
-
1
;
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
SCreateVnodeReq
createReq
=
{
0
};
createReq
.
vgId
=
pVgroup
->
vgId
;
createReq
.
dnodeId
=
pDnode
->
id
;
memcpy
(
createReq
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
createReq
.
dbUid
=
pDb
->
uid
;
createReq
.
vgVersion
=
pVgroup
->
version
;
createReq
.
cacheBlockSize
=
pDb
->
cfg
.
cacheBlockSize
;
createReq
.
totalBlocks
=
pDb
->
cfg
.
totalBlocks
;
createReq
.
daysPerFile
=
pDb
->
cfg
.
daysPerFile
;
createReq
.
daysToKeep0
=
pDb
->
cfg
.
daysToKeep0
;
createReq
.
daysToKeep1
=
pDb
->
cfg
.
daysToKeep1
;
createReq
.
daysToKeep2
=
pDb
->
cfg
.
daysToKeep2
;
createReq
.
minRows
=
pDb
->
cfg
.
minRows
;
createReq
.
maxRows
=
pDb
->
cfg
.
maxRows
;
createReq
.
commitTime
=
pDb
->
cfg
.
commitTime
;
createReq
.
fsyncPeriod
=
pDb
->
cfg
.
fsyncPeriod
;
createReq
.
walLevel
=
pDb
->
cfg
.
walLevel
;
createReq
.
precision
=
pDb
->
cfg
.
precision
;
createReq
.
compression
=
pDb
->
cfg
.
compression
;
createReq
.
quorum
=
pDb
->
cfg
.
quorum
;
createReq
.
update
=
pDb
->
cfg
.
update
;
createReq
.
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
createReq
.
replica
=
pVgroup
->
replica
;
createReq
.
selfIndex
=
-
1
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
v
];
SReplica
*
pReplica
=
&
createReq
.
replicas
[
v
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pVgidDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pVgidDnode
==
NULL
)
{
free
(
pCreate
);
return
NULL
;
}
...
...
@@ -235,20 +229,33 @@ SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
pCreate
->
selfIndex
=
v
;
createReq
.
selfIndex
=
v
;
}
}
if
(
pCreate
->
selfIndex
==
-
1
)
{
free
(
pCreate
);
if
(
createReq
.
selfIndex
==
-
1
)
{
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
return
pCreate
;
int32_t
contLen
=
tSerializeSCreateVnodeReq
(
NULL
,
0
,
&
createReq
);
if
(
contLen
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
void
*
pReq
=
malloc
(
contLen
);
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
tSerializeSCreateVnodeReq
(
pReq
,
contLen
,
&
createReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
SDropVnodeReq
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
SDropVnodeReq
dropReq
=
{
0
};
dropReq
.
dnodeId
=
pDnode
->
id
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录