Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
80170d3e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
80170d3e
编写于
10月 15, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17367 from taosdata/enh/TD-19463_2
enh: refactor the code to create and delete mnodes
上级
86ba7746
af070e6c
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
444 addition
and
394 deletion
+444
-394
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+3
-2
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-2
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+2
-2
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
+78
-34
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+13
-15
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+19
-23
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+7
-7
source/dnode/mnode/impl/inc/mndSync.h
source/dnode/mnode/impl/inc/mndSync.h
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-1
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+50
-39
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+183
-177
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+13
-18
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-1
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+2
-1
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+16
-6
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+14
-7
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+2
-2
tests/script/tsim/mnode/basic1.sim
tests/script/tsim/mnode/basic1.sim
+5
-1
tests/script/tsim/mnode/basic4.sim
tests/script/tsim/mnode/basic4.sim
+6
-44
tests/script/tsim/mnode/basic5.sim
tests/script/tsim/mnode/basic5.sim
+2
-3
tests/script/tsim/valgrind/checkError3.sim
tests/script/tsim/valgrind/checkError3.sim
+10
-1
tests/script/tsim/valgrind/checkError6.sim
tests/script/tsim/valgrind/checkError6.sim
+8
-2
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-3
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
80170d3e
...
...
@@ -30,9 +30,10 @@ typedef struct SMnode SMnode;
typedef
struct
{
int32_t
dnodeId
;
bool
standby
;
bool
deploy
;
SReplica
replica
;
int8_t
selfIndex
;
int8_t
numOfReplicas
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
SMsgCb
msgCb
;
}
SMnodeOpt
;
...
...
include/libs/sync/sync.h
浏览文件 @
80170d3e
...
...
@@ -211,7 +211,7 @@ typedef struct SSyncInfo {
int32_t
syncInit
();
void
syncCleanUp
();
int64_t
syncOpen
(
const
SSyncInfo
*
pSyncInfo
);
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncStart
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
int32_t
syncSetStandby
(
int64_t
rid
);
...
...
@@ -233,7 +233,7 @@ const char* syncStr(ESyncState state);
bool
syncIsRestoreFinish
(
int64_t
rid
);
int32_t
syncGetSnapshotByIndex
(
int64_t
rid
,
SyncIndex
index
,
SSnapshot
*
pSnapshot
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
);
// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
);
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
80170d3e
...
...
@@ -41,8 +41,8 @@ typedef struct SMnodeMgmt {
}
SMnodeMgmt
;
// mmFile.c
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
SReplica
*
pReplica
,
bool
*
pDeployed
);
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
bool
deployed
);
int32_t
mmReadFile
(
const
char
*
path
,
SMnodeOpt
*
pOption
);
int32_t
mmWriteFile
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
);
// mmHandle.c
SArray
*
mmGetMsgHandles
();
...
...
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
浏览文件 @
80170d3e
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
SReplica
*
pReplica
,
bool
*
pDeployed
)
{
int32_t
mmReadFile
(
const
char
*
path
,
SMnodeOpt
*
pOption
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
...
...
@@ -25,7 +25,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
char
file
[
PATH_MAX
]
=
{
0
};
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json"
,
p
Mgmt
->
p
ath
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json"
,
path
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
code
=
0
;
...
...
@@ -50,38 +50,69 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
_OVER
;
}
*
pDeployed
=
deployed
->
valueint
;
pOption
->
deploy
=
deployed
->
valueint
;
cJSON
*
id
=
cJSON_GetObjectItem
(
root
,
"id
"
);
if
(
id
)
{
if
(
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since
id
not found"
,
file
);
cJSON
*
selfIndex
=
cJSON_GetObjectItem
(
root
,
"selfIndex
"
);
if
(
selfIndex
)
{
if
(
selfIndex
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since
selfIndex
not found"
,
file
);
goto
_OVER
;
}
if
(
pReplica
)
{
pReplica
->
id
=
id
->
valueint
;
}
pOption
->
selfIndex
=
selfIndex
->
valueint
;
}
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
root
,
"fqdn
"
);
if
(
fqdn
)
{
if
(
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since
fqdn
not found"
,
file
);
cJSON
*
replicas
=
cJSON_GetObjectItem
(
root
,
"replicas
"
);
if
(
replicas
)
{
if
(
replicas
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since
replicas
not found"
,
file
);
goto
_OVER
;
}
if
(
pReplica
)
{
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
}
}
cJSON
*
port
=
cJSON_GetObjectItem
(
root
,
"port"
);
if
(
port
)
{
if
(
port
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since port not found"
,
file
);
int32_t
numOfReplicas
=
cJSON_GetArraySize
(
replicas
);
if
(
numOfReplicas
<=
0
)
{
dError
(
"failed to read %s since numOfReplicas:%d invalid"
,
file
,
numOfReplicas
);
goto
_OVER
;
}
if
(
pReplica
)
{
pReplica
->
port
=
(
uint16_t
)
port
->
valueint
;
pOption
->
numOfReplicas
=
numOfReplicas
;
for
(
int32_t
i
=
0
;
i
<
numOfReplicas
;
++
i
)
{
SReplica
*
pReplica
=
pOption
->
replicas
+
i
;
cJSON
*
replica
=
cJSON_GetArrayItem
(
replicas
,
i
);
if
(
replica
==
NULL
)
break
;
cJSON
*
id
=
cJSON_GetObjectItem
(
replica
,
"id"
);
if
(
id
)
{
if
(
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since id not found"
,
file
);
goto
_OVER
;
}
if
(
pReplica
)
{
pReplica
->
id
=
id
->
valueint
;
}
}
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
replica
,
"fqdn"
);
if
(
fqdn
)
{
if
(
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
_OVER
;
}
if
(
pReplica
)
{
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
}
}
cJSON
*
port
=
cJSON_GetObjectItem
(
replica
,
"port"
);
if
(
port
)
{
if
(
port
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since port not found"
,
file
);
goto
_OVER
;
}
if
(
pReplica
)
{
pReplica
->
port
=
(
uint16_t
)
port
->
valueint
;
}
}
}
}
...
...
@@ -92,18 +123,18 @@ _OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
code
==
0
)
{
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
*
pDeployed
);
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
pOption
->
deploy
);
}
terrno
=
code
;
return
code
;
}
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
bool
deployed
)
{
int32_t
mmWriteFile
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
char
file
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json.bak"
,
p
Mgmt
->
p
ath
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%smnode.json"
,
p
Mgmt
->
p
ath
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json.bak"
,
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%smnode.json"
,
path
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -117,12 +148,25 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed)
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
if
(
pReplica
!=
NULL
&&
pReplica
->
id
>
0
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
,"
,
pReplica
->
port
);
if
(
pOption
->
deploy
&&
pOption
->
numOfReplicas
>
0
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
selfIndex
\"
: %d,
\n
"
,
pOption
->
selfIndex
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
replicas
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
pOption
->
numOfReplicas
;
++
i
)
{
const
SReplica
*
pReplica
=
pOption
->
replicas
+
i
;
if
(
pReplica
!=
NULL
&&
pReplica
->
id
>
0
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
"
,
pReplica
->
port
);
}
if
(
i
<
pOption
->
numOfReplicas
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }],
\n
"
);
}
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d
\n
"
,
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d
\n
"
,
pOption
->
deploy
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
taosWriteFile
(
pFile
,
content
,
len
);
...
...
@@ -136,6 +180,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed)
return
-
1
;
}
dDebug
(
"successed to write %s, deployed:%d"
,
realfile
,
deployed
);
dDebug
(
"successed to write %s, deployed:%d"
,
realfile
,
pOption
->
deploy
);
return
0
;
}
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
80170d3e
...
...
@@ -80,18 +80,21 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
replica
!=
1
)
{
SMnodeOpt
option
=
{.
deploy
=
true
,
.
numOfReplicas
=
createReq
.
replica
,
.
selfIndex
=
-
1
};
memcpy
(
option
.
replicas
,
createReq
.
replicas
,
sizeof
(
createReq
.
replicas
));
for
(
int32_t
i
=
0
;
i
<
option
.
numOfReplicas
;
++
i
)
{
if
(
createReq
.
replicas
[
i
].
id
==
pInput
->
pData
->
dnodeId
)
{
option
.
selfIndex
=
i
;
}
}
if
(
option
.
selfIndex
==
-
1
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dGError
(
"failed to create mnode since %s"
,
terrstr
());
dGError
(
"failed to create mnode since %s
, selfIndex is -1
"
,
terrstr
());
return
-
1
;
}
bool
deployed
=
true
;
SMnodeMgmt
mgmt
=
{
0
};
mgmt
.
path
=
pInput
->
path
;
mgmt
.
name
=
pInput
->
name
;
if
(
mmWriteFile
(
&
mgmt
,
&
createReq
.
replicas
[
0
],
deployed
)
!=
0
)
{
if
(
mmWriteFile
(
pInput
->
path
,
&
option
)
!=
0
)
{
dGError
(
"failed to write mnode file since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -113,12 +116,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return
-
1
;
}
bool
deployed
=
false
;
SMnodeMgmt
mgmt
=
{
0
};
mgmt
.
path
=
pInput
->
path
;
mgmt
.
name
=
pInput
->
name
;
if
(
mmWriteFile
(
&
mgmt
,
NULL
,
deployed
)
!=
0
)
{
SMnodeOpt
option
=
{.
deploy
=
false
};
if
(
mmWriteFile
(
pInput
->
path
,
&
option
)
!=
0
)
{
dGError
(
"failed to write mnode file since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -207,7 +206,6 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_HEARTBEAT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_STATUS
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SYSTABLE_RETRIEVE
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_AUTH
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SHOW_VARIABLES
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SERVER_VERSION
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
80170d3e
...
...
@@ -25,38 +25,35 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
}
static
int32_t
mmRequire
(
const
SMgmtInputOpt
*
pInput
,
bool
*
required
)
{
SMnodeMgmt
mgmt
=
{
0
};
mgmt
.
path
=
pInput
->
path
;
if
(
mmReadFile
(
&
mgmt
,
NULL
,
required
)
!=
0
)
{
SMnodeOpt
option
=
{
0
};
if
(
mmReadFile
(
pInput
->
path
,
&
option
)
!=
0
)
{
return
-
1
;
}
if
(
!
(
*
required
)
)
{
if
(
!
option
.
deploy
)
{
*
required
=
mmDeployRequired
(
pInput
);
}
else
{
*
required
=
true
;
}
return
0
;
}
static
void
mmBuildOptionForDeploy
(
SMnodeMgmt
*
pMgmt
,
const
SMgmtInputOpt
*
pInput
,
SMnodeOpt
*
pOption
)
{
pOption
->
standby
=
false
;
pOption
->
deploy
=
true
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
pOption
->
replica
.
id
=
1
;
pOption
->
replica
.
port
=
tsServerPort
;
tstrncpy
(
pOption
->
replica
.
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
pOption
->
selfIndex
=
0
;
pOption
->
numOfReplicas
=
1
;
pOption
->
replicas
[
0
].
id
=
1
;
pOption
->
replicas
[
0
].
port
=
tsServerPort
;
tstrncpy
(
pOption
->
replicas
[
0
].
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
}
static
void
mmBuildOptionForOpen
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
SMnodeOpt
*
pOption
)
{
pOption
->
standby
=
false
;
static
void
mmBuildOptionForOpen
(
SMnodeMgmt
*
pMgmt
,
SMnodeOpt
*
pOption
)
{
pOption
->
deploy
=
false
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
if
(
pReplica
->
id
>
0
)
{
pOption
->
standby
=
true
;
pOption
->
replica
=
*
pReplica
;
}
}
static
void
mmClose
(
SMnodeMgmt
*
pMgmt
)
{
...
...
@@ -95,22 +92,20 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt
->
msgCb
.
mgmt
=
pMgmt
;
taosThreadRwlockInit
(
&
pMgmt
->
lock
,
NULL
);
bool
deployed
=
false
;
SReplica
replica
=
{
0
};
if
(
mmReadFile
(
pMgmt
,
&
replica
,
&
deployed
)
!=
0
)
{
SMnodeOpt
option
=
{
0
};
if
(
mmReadFile
(
pMgmt
->
path
,
&
option
)
!=
0
)
{
dError
(
"failed to read file since %s"
,
terrstr
());
mmClose
(
pMgmt
);
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
!
deployed
)
{
if
(
!
option
.
deploy
)
{
dInfo
(
"mnode start to deploy"
);
pMgmt
->
pData
->
dnodeId
=
1
;
mmBuildOptionForDeploy
(
pMgmt
,
pInput
,
&
option
);
}
else
{
dInfo
(
"mnode start to open"
);
mmBuildOptionForOpen
(
pMgmt
,
&
replica
,
&
option
);
mmBuildOptionForOpen
(
pMgmt
,
&
option
);
}
pMgmt
->
pMnode
=
mndOpen
(
pMgmt
->
path
,
&
option
);
...
...
@@ -128,9 +123,10 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup
(
"mnode-worker"
,
"initialized"
);
if
(
!
deployed
||
replica
.
id
>
0
)
{
deployed
=
true
;
if
(
mmWriteFile
(
pMgmt
,
NULL
,
deployed
)
!=
0
)
{
if
(
option
.
numOfReplicas
>
0
)
{
option
.
deploy
=
true
;
option
.
numOfReplicas
=
0
;
if
(
mmWriteFile
(
pMgmt
->
path
,
&
option
)
!=
0
)
{
dError
(
"failed to write mnode file since %s"
,
terrstr
());
return
-
1
;
}
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
80170d3e
...
...
@@ -87,12 +87,13 @@ typedef struct {
typedef
struct
{
tsem_t
syncSem
;
int64_t
sync
;
SReplica
replica
;
int32_t
errCode
;
int32_t
transId
;
SRWLatch
lock
;
int8_t
standby
;
int8_t
leaderTransferFinish
;
int8_t
selfIndex
;
int8_t
numOfReplicas
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SSyncMgmt
;
typedef
struct
{
...
...
@@ -130,11 +131,10 @@ typedef struct SMnode {
void
mndSetMsgHandle
(
SMnode
*
pMnode
,
tmsg_t
msgType
,
MndMsgFp
fp
);
int64_t
mndGenerateUid
(
const
char
*
name
,
int32_t
len
);
int32_t
mndAcquireRpcRef
(
SMnode
*
pMnode
);
void
mndReleaseRpcRef
(
SMnode
*
pMnode
);
void
mndSetRestore
(
SMnode
*
pMnode
,
bool
restored
);
void
mndSetStop
(
SMnode
*
pMnode
);
bool
mndGetStop
(
SMnode
*
pMnode
);
void
mndSetRestored
(
SMnode
*
pMnode
,
bool
restored
);
bool
mndGetRestored
(
SMnode
*
pMnode
);
void
mndSetStop
(
SMnode
*
pMnode
);
bool
mndGetStop
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndSync.h
浏览文件 @
80170d3e
...
...
@@ -24,7 +24,7 @@ extern "C" {
int32_t
mndInitSync
(
SMnode
*
pMnode
);
void
mndCleanupSync
(
SMnode
*
pMnode
);
bool
mndIs
Mast
er
(
SMnode
*
pMnode
);
bool
mndIs
Lead
er
(
SMnode
*
pMnode
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
int32_t
transId
);
void
mndSyncStart
(
SMnode
*
pMnode
);
void
mndSyncStop
(
SMnode
*
pMnode
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
80170d3e
...
...
@@ -1809,7 +1809,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
}
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetchAll
(
pSdb
,
SDB_DB
,
pShow
->
pIter
,
(
void
**
)
&
pDb
,
&
objStatus
);
pShow
->
pIter
=
sdbFetchAll
(
pSdb
,
SDB_DB
,
pShow
->
pIter
,
(
void
**
)
&
pDb
,
&
objStatus
,
true
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_READ_OR_WRITE_DB
,
pDb
)
==
0
)
{
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
80170d3e
...
...
@@ -423,7 +423,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
goto
_OVER
;
}
else
{
pDnode
->
accessTimes
++
;
m
Trace
(
"dnode:%d, status received, access times %d"
,
pDnode
->
id
,
pDnode
->
accessTimes
);
m
Debug
(
"dnode:%d, status received, access times %d"
,
pDnode
->
id
,
pDnode
->
accessTimes
);
}
}
...
...
@@ -471,6 +471,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
pDnode
->
lastAccessTime
=
curMs
;
pDnode
->
accessTimes
++
;
code
=
0
;
_OVER:
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
80170d3e
...
...
@@ -43,6 +43,37 @@
#include "mndUser.h"
#include "mndVgroup.h"
static
inline
int32_t
mndAcquireRpc
(
SMnode
*
pMnode
)
{
int32_t
code
=
0
;
taosThreadRwlockRdlock
(
&
pMnode
->
lock
);
if
(
pMnode
->
stopped
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
code
=
-
1
;
}
else
if
(
!
mndIsLeader
(
pMnode
))
{
code
=
-
1
;
}
else
{
#if 1
atomic_add_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
#else
int32_t
ref
=
atomic_add_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
mTrace
(
"mnode rpc is acquired, ref:%d"
,
ref
);
#endif
}
taosThreadRwlockUnlock
(
&
pMnode
->
lock
);
return
code
;
}
static
inline
void
mndReleaseRpc
(
SMnode
*
pMnode
)
{
taosThreadRwlockRdlock
(
&
pMnode
->
lock
);
#if 1
atomic_sub_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
#else
int32_t
ref
=
atomic_sub_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
mTrace
(
"mnode rpc is released, ref:%d"
,
ref
);
#endif
taosThreadRwlockUnlock
(
&
pMnode
->
lock
);
}
static
void
*
mndBuildTimerMsg
(
int32_t
*
pContLen
)
{
SMTimerReq
timerReq
=
{
0
};
...
...
@@ -338,8 +369,9 @@ static int32_t mndExecSteps(SMnode *pMnode) {
static
void
mndSetOptions
(
SMnode
*
pMnode
,
const
SMnodeOpt
*
pOption
)
{
pMnode
->
msgCb
=
pOption
->
msgCb
;
pMnode
->
selfDnodeId
=
pOption
->
dnodeId
;
pMnode
->
syncMgmt
.
replica
=
pOption
->
replica
;
pMnode
->
syncMgmt
.
standby
=
pOption
->
standby
;
pMnode
->
syncMgmt
.
selfIndex
=
pOption
->
selfIndex
;
pMnode
->
syncMgmt
.
numOfReplicas
=
pOption
->
numOfReplicas
;
memcpy
(
pMnode
->
syncMgmt
.
replicas
,
pOption
->
replicas
,
sizeof
(
pOption
->
replicas
));
}
SMnode
*
mndOpen
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
...
...
@@ -430,7 +462,7 @@ int32_t mndStart(SMnode *pMnode) {
mError
(
"failed to deploy sdb while start mnode"
);
return
-
1
;
}
mndSetRestore
(
pMnode
,
true
);
mndSetRestore
d
(
pMnode
,
true
);
}
grantReset
(
pMnode
,
TSDB_GRANT_ALL
,
0
);
...
...
@@ -570,23 +602,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_DROP_TASK
)
{
return
0
;
}
if
(
mndAcquireRpc
Ref
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
if
(
mndAcquireRpc
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
if
(
pMsg
->
msgType
==
TDMT_MND_MQ_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TELEM_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TRANS_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TTL_TIMER
||
pMsg
->
msgType
==
TDMT_MND_UPTIME_TIMER
)
{
return
-
1
;
}
SEpSet
epSet
=
{
0
};
mndGetMnodeEpSet
(
pMsg
->
info
.
node
,
&
epSet
);
SEpSet
epSet
=
{
0
};
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
mndGetMnodeEpSet
(
pMnode
,
&
epSet
);
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
mError
(
"msg:%p, failed to check mnode state since %s, type:%s, numOfMnodes:%d inUse:%d"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
),
epSet
.
numOfEps
,
epSet
.
inUse
);
mDebug
(
"msg:%p, failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s "
"numOfEps:%d inUse:%d"
,
pMsg
,
terrstr
(),
pMnode
->
restored
,
pMnode
->
stopped
,
syncIsRestoreFinish
(
pMnode
->
syncMgmt
.
sync
),
syncGetMyRoleStr
(
pMnode
->
syncMgmt
.
sync
),
TMSG_INFO
(
pMsg
->
msgType
),
epSet
.
numOfEps
,
epSet
.
inUse
);
if
(
epSet
.
numOfEps
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
m
Info
(
"mnode index:%d, ep:%s:%u"
,
i
,
epSet
.
eps
[
i
].
fqdn
,
epSet
.
eps
[
i
].
port
);
m
Debug
(
"mnode index:%d, ep:%s:%u"
,
i
,
epSet
.
eps
[
i
].
fqdn
,
epSet
.
eps
[
i
].
port
);
}
int32_t
contLen
=
tSerializeSEpSet
(
NULL
,
0
,
&
epSet
);
...
...
@@ -633,7 +669,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
mGTrace
(
"msg:%p, start to process in mnode, app:%p type:%s"
,
pMsg
,
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
int32_t
code
=
(
*
fp
)(
pMsg
);
mndReleaseRpc
Ref
(
pMnode
);
mndReleaseRpc
(
pMnode
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mGTrace
(
"msg:%p, won't response immediately since in progress"
,
pMsg
);
...
...
@@ -669,7 +705,7 @@ int64_t mndGenerateUid(const char *name, int32_t len) {
int32_t
mndGetMonitorInfo
(
SMnode
*
pMnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonStbInfo
*
pStbInfo
,
SMonGrantInfo
*
pGrantInfo
)
{
if
(
mndAcquireRpc
Ref
(
pMnode
)
!=
0
)
return
-
1
;
if
(
mndAcquireRpc
(
pMnode
)
!=
0
)
return
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int64_t
ms
=
taosGetTimestampMs
();
...
...
@@ -680,7 +716,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pStbInfo
->
stbs
=
taosArrayInit
(
sdbGetSize
(
pSdb
,
SDB_STB
),
sizeof
(
SMonStbDesc
));
if
(
pClusterInfo
->
dnodes
==
NULL
||
pClusterInfo
->
mnodes
==
NULL
||
pVgroupInfo
->
vgroups
==
NULL
||
pStbInfo
->
stbs
==
NULL
)
{
mndReleaseRpc
Ref
(
pMnode
);
mndReleaseRpc
(
pMnode
);
return
-
1
;
}
...
...
@@ -800,7 +836,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pGrantInfo
->
timeseries_total
=
INT32_MAX
;
}
mndReleaseRpc
Ref
(
pMnode
);
mndReleaseRpc
(
pMnode
);
return
0
;
}
...
...
@@ -810,32 +846,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
return
0
;
}
int32_t
mndAcquireRpcRef
(
SMnode
*
pMnode
)
{
int32_t
code
=
0
;
taosThreadRwlockRdlock
(
&
pMnode
->
lock
);
if
(
pMnode
->
stopped
)
{
mTrace
(
"mnode not running"
);
terrno
=
TSDB_CODE_APP_NOT_READY
;
code
=
-
1
;
}
else
if
(
!
mndIsMaster
(
pMnode
))
{
mTrace
(
"mnode not ready, role:%s restored:%d"
,
syncGetMyRoleStr
(
pMnode
->
syncMgmt
.
sync
),
pMnode
->
restored
);
code
=
-
1
;
}
else
{
int32_t
ref
=
atomic_add_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
// mTrace("mnode rpc is acquired, ref:%d", ref);
}
taosThreadRwlockUnlock
(
&
pMnode
->
lock
);
return
code
;
}
void
mndReleaseRpcRef
(
SMnode
*
pMnode
)
{
taosThreadRwlockRdlock
(
&
pMnode
->
lock
);
int32_t
ref
=
atomic_sub_fetch_32
(
&
pMnode
->
rpcRef
,
1
);
// mTrace("mnode rpc is released, ref:%d", ref);
taosThreadRwlockUnlock
(
&
pMnode
->
lock
);
}
void
mndSetRestore
(
SMnode
*
pMnode
,
bool
restored
)
{
void
mndSetRestored
(
SMnode
*
pMnode
,
bool
restored
)
{
if
(
restored
)
{
taosThreadRwlockWrlock
(
&
pMnode
->
lock
);
pMnode
->
restored
=
true
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
80170d3e
...
...
@@ -36,6 +36,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq);
static
int32_t
mndProcessDropMnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveMnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextMnode
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndReloadSyncConfig
(
SMnode
*
pMnode
);
int32_t
mndInitMnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
...
...
@@ -187,6 +188,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
}
pObj
->
state
=
TAOS_SYNC_STATE_ERROR
;
mndReloadSyncConfig
(
pSdb
->
pMnode
);
return
0
;
}
...
...
@@ -203,6 +205,8 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
static
int32_t
mndMnodeActionUpdate
(
SSdb
*
pSdb
,
SMnodeObj
*
pOld
,
SMnodeObj
*
pNew
)
{
mTrace
(
"mnode:%d, perform update action, old row:%p new row:%p"
,
pOld
->
id
,
pOld
,
pNew
);
pOld
->
updateTime
=
pNew
->
updateTime
;
mndReloadSyncConfig
(
pSdb
->
pMnode
);
return
0
;
}
...
...
@@ -233,7 +237,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if
(
pIter
==
NULL
)
break
;
if
(
pObj
->
id
==
pMnode
->
selfDnodeId
)
{
if
(
mndIs
Mast
er
(
pMnode
))
{
if
(
mndIs
Lead
er
(
pMnode
))
{
pEpSet
->
inUse
=
pEpSet
->
numOfEps
;
}
else
{
pEpSet
->
inUse
=
(
pEpSet
->
numOfEps
+
1
)
%
totalMnodes
;
...
...
@@ -248,6 +252,10 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if
(
pEpSet
->
numOfEps
==
0
)
{
syncGetRetryEpSet
(
pMnode
->
syncMgmt
.
sync
,
pEpSet
);
}
if
(
pEpSet
->
inUse
>=
pEpSet
->
numOfEps
)
{
pEpSet
->
inUse
=
0
;
}
}
static
int32_t
mndSetCreateMnodeRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMnodeObj
*
pObj
)
{
...
...
@@ -274,13 +282,72 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod
return
0
;
}
static
int32_t
mndBuildCreateMnodeRedoAction
(
STrans
*
pTrans
,
SDCreateMnodeReq
*
pCreateReq
,
SEpSet
*
pCreateEpSet
)
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
pCreateReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
pCreateReq
);
STransAction
action
=
{
.
epSet
=
*
pCreateEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_CREATE_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndBuildAlterMnodeRedoAction
(
STrans
*
pTrans
,
SDCreateMnodeReq
*
pAlterReq
,
SEpSet
*
pAlterEpSet
)
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
pAlterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
pAlterReq
);
STransAction
action
=
{
.
epSet
=
*
pAlterEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_ALTER_MNODE
,
.
acceptableCode
=
0
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndBuildDropMnodeRedoAction
(
STrans
*
pTrans
,
SDDropMnodeReq
*
pDropReq
,
SEpSet
*
pDroprEpSet
)
{
int32_t
contLen
=
tSerializeSCreateDropMQSBNodeReq
(
NULL
,
0
,
pDropReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSCreateDropMQSBNodeReq
(
pReq
,
contLen
,
pDropReq
);
STransAction
action
=
{
.
epSet
=
*
pDroprEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_DROP_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndSetCreateMnodeRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDAlterMnodeReq
alterReq
=
{
0
};
SDCreateMnodeReq
createReq
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
createEpset
=
{
0
};
while
(
1
)
{
...
...
@@ -288,75 +355,25 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
);
if
(
pIter
==
NULL
)
break
;
alterReq
.
replicas
[
numOfReplicas
].
id
=
pMObj
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
alterEpset
.
eps
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pMObj
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
alterEpset
.
inUse
=
numOfReplicas
;
}
createReq
.
replicas
[
numOfReplicas
].
id
=
pMObj
->
id
;
createReq
.
replicas
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
createReq
.
replicas
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
numOfReplicas
++
;
sdbRelease
(
pSdb
,
pMObj
);
}
alterReq
.
replica
=
numOfReplicas
+
1
;
alterReq
.
replicas
[
numOfReplicas
].
id
=
pDnode
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
alterEpset
.
numOfEps
=
numOfReplicas
+
1
;
alterEpset
.
eps
[
numOfReplicas
].
port
=
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
createReq
.
replica
=
1
;
createReq
.
replicas
[
0
].
id
=
pDnode
->
id
;
createReq
.
replicas
[
0
].
port
=
pDnode
->
port
;
memcpy
(
createReq
.
replicas
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
createReq
.
replica
=
numOfReplicas
+
1
;
createReq
.
replicas
[
numOfReplicas
].
id
=
pDnode
->
id
;
createReq
.
replicas
[
numOfReplicas
].
port
=
pDnode
->
port
;
memcpy
(
createReq
.
replicas
[
numOfReplicas
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
createEpset
.
inUse
=
0
;
createEpset
.
numOfEps
=
1
;
createEpset
.
eps
[
0
].
port
=
pDnode
->
port
;
memcpy
(
createEpset
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
createReq
);
STransAction
action
=
{
.
epSet
=
createEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_CREATE_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
alterReq
);
STransAction
action
=
{
.
epSet
=
alterEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_ALTER_MNODE
,
.
acceptableCode
=
0
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
if
(
mndBuildCreateMnodeRedoAction
(
pTrans
,
&
createReq
,
&
createEpset
)
!=
0
)
return
-
1
;
return
0
;
}
...
...
@@ -374,9 +391,9 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndSetCreateMnodeRedoActions
(
pMnode
,
pTrans
,
pDnode
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoActions
(
pMnode
,
pTrans
,
pDnode
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndTransAppendNullLog
(
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
...
@@ -459,107 +476,28 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
}
static
int32_t
mndSetDropMnodeRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDAlterMnodeReq
alterReq
=
{
0
};
SDDropMnodeReq
dropReq
=
{
0
};
SSetStandbyReq
standbyReq
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
while
(
1
)
{
SMnodeObj
*
pMObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
);
if
(
pIter
==
NULL
)
break
;
if
(
pMObj
->
id
==
pObj
->
id
)
{
sdbRelease
(
pSdb
,
pMObj
);
continue
;
}
alterReq
.
replicas
[
numOfReplicas
].
id
=
pMObj
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
alterEpset
.
eps
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pMObj
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
alterEpset
.
inUse
=
numOfReplicas
;
}
numOfReplicas
++
;
sdbRelease
(
pSdb
,
pMObj
);
}
alterReq
.
replica
=
numOfReplicas
;
alterEpset
.
numOfEps
=
numOfReplicas
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDDropMnodeReq
dropReq
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
dropReq
.
dnodeId
=
pDnode
->
id
;
dropEpSet
.
numOfEps
=
1
;
dropEpSet
.
eps
[
0
].
port
=
pDnode
->
port
;
memcpy
(
dropEpSet
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
standbyReq
.
dnodeId
=
pDnode
->
id
;
standbyReq
.
standby
=
1
;
{
int32_t
contLen
=
tSerializeSSetStandbyReq
(
NULL
,
0
,
&
standbyReq
)
+
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSSetStandbyReq
((
char
*
)
pReq
+
sizeof
(
SMsgHead
),
contLen
,
&
standbyReq
);
SMsgHead
*
pHead
=
pReq
;
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
MNODE_HANDLE
);
STransAction
action
=
{
.
epSet
=
dropEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_SYNC_SET_MNODE_STANDBY
,
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
alterReq
);
STransAction
action
=
{
.
epSet
=
alterEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_ALTER_MNODE
,
.
acceptableCode
=
0
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
{
int32_t
contLen
=
tSerializeSCreateDropMQSBNodeReq
(
NULL
,
0
,
&
dropReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSCreateDropMQSBNodeReq
(
pReq
,
contLen
,
&
dropReq
);
STransAction
action
=
{
.
epSet
=
dropEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_DROP_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
int32_t
totalMnodes
=
sdbGetSize
(
pSdb
,
SDB_MNODE
);
if
(
totalMnodes
==
2
)
{
mInfo
(
"vgId:1, has %d mnodes, exec redo log first"
,
totalMnodes
);
if
(
mndSetDropMnodeRedoLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
if
(
mndBuildDropMnodeRedoAction
(
pTrans
,
&
dropReq
,
&
dropEpSet
)
!=
0
)
return
-
1
;
}
else
if
(
totalMnodes
==
3
)
{
mInfo
(
"vgId:1, has %d mnodes, exec redo action first"
,
totalMnodes
);
if
(
mndBuildDropMnodeRedoAction
(
pTrans
,
&
dropReq
,
&
dropEpSet
)
!=
0
)
return
-
1
;
if
(
mndSetDropMnodeRedoLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
}
else
{
return
-
1
;
}
return
0
;
...
...
@@ -567,7 +505,6 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t
mndSetDropMnodeInfoToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMnodeObj
*
pObj
)
{
if
(
pObj
==
NULL
)
return
0
;
if
(
mndSetDropMnodeRedoLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
if
(
mndSetDropMnodeCommitLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
if
(
mndSetDropMnodeRedoActions
(
pMnode
,
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
return
-
1
;
if
(
mndTransAppendNullLog
(
pTrans
)
!=
0
)
return
-
1
;
...
...
@@ -657,7 +594,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int64_t
curMs
=
taosGetTimestampMs
();
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetchAll
(
pSdb
,
SDB_MNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
,
&
objStatus
);
pShow
->
pIter
=
sdbFetchAll
(
pSdb
,
SDB_MNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
,
&
objStatus
,
true
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -712,6 +649,9 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
}
static
int32_t
mndProcessAlterMnodeReq
(
SRpcMsg
*
pReq
)
{
#if 1
return
0
;
#else
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SDAlterMnodeReq
alterReq
=
{
0
};
...
...
@@ -720,41 +660,107 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
return
-
1
;
}
SMnodeOpt
option
=
{.
deploy
=
true
,
.
numOfReplicas
=
alterReq
.
replica
,
.
selfIndex
=
-
1
};
memcpy
(
option
.
replicas
,
alterReq
.
replicas
,
sizeof
(
alterReq
.
replicas
));
for
(
int32_t
i
=
0
;
i
<
option
.
numOfReplicas
;
++
i
)
{
if
(
alterReq
.
replicas
[
i
].
id
==
pMnode
->
selfDnodeId
)
{
option
.
selfIndex
=
i
;
}
}
if
(
option
.
selfIndex
==
-
1
)
{
mInfo
(
"alter mnode not processed since selfIndex is -1"
,
terrstr
());
return
0
;
}
if
(
mndWriteFile
(
pMnode
->
path
,
&
option
)
!=
0
)
{
mError
(
"failed to write mnode file since %s"
,
terrstr
());
return
-
1
;
}
SSyncCfg
cfg
=
{.
replicaNum
=
alterReq
.
replica
,
.
myIndex
=
-
1
};
for
(
int32_t
i
=
0
;
i
<
alterReq
.
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
tstrncpy
(
pNode
->
nodeFqdn
,
alterReq
.
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
alterReq
.
replicas
[
i
].
port
;
if
(
alterReq
.
replicas
[
i
].
id
==
pMnode
->
selfDnodeId
)
cfg
.
myIndex
=
i
;
if
(
alterReq
.
replicas
[
i
].
id
==
pMnode
->
selfDnodeId
)
{
cfg
.
myIndex
=
i
;
}
}
if
(
cfg
.
myIndex
==
-
1
)
{
mError
(
"failed to alter mnode since myindex is -1"
);
return
-
1
;
}
else
{
mInfo
(
"start to alter mnode sync, replica:%d my
i
ndex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
mInfo
(
"start to alter mnode sync, replica:%d my
I
ndex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
for
(
int32_t
i
=
0
;
i
<
alterReq
.
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
mInfo
(
"index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
}
mInfo
(
"trans:-1, sync reconfig will be proposed"
);
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
standby
=
0
;
int32_t
code
=
syncReconfig
(
pMgmt
->
sync
,
&
cfg
);
int32_t
code
=
syncReconfig
(
pMnode
->
syncMgmt
.
sync
,
&
cfg
);
if
(
code
!=
0
)
{
mError
(
"trans:-1, failed to propose sync reconfig since %s"
,
terrstr
());
return
code
;
mError
(
"failed to sync reconfig since %s"
,
terrstr
());
}
else
{
pMgmt
->
errCode
=
0
;
taosWLockLatch
(
&
pMgmt
->
lock
);
pMgmt
->
transId
=
-
1
;
taosWUnLockLatch
(
&
pMgmt
->
lock
);
tsem_wait
(
&
pMgmt
->
syncSem
);
mInfo
(
"alter mnode sync result:0x%x %s"
,
pMgmt
->
errCode
,
tstrerror
(
pMgmt
->
errCode
));
terrno
=
pMgmt
->
errCode
;
return
pMgmt
->
errCode
;
mInfo
(
"alter mnode sync success"
);
}
return
code
;
#endif
}
static
void
mndReloadSyncConfig
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMnodeObj
*
pObj
=
NULL
;
ESdbStatus
objStatus
=
0
;
void
*
pIter
=
NULL
;
bool
hasUpdatingMnode
=
false
;
SSyncCfg
cfg
=
{.
myIndex
=
-
1
};
while
(
1
)
{
pIter
=
sdbFetchAll
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pObj
,
&
objStatus
,
false
);
if
(
pIter
==
NULL
)
break
;
if
(
objStatus
==
SDB_STATUS_CREATING
||
objStatus
==
SDB_STATUS_DROPPING
)
{
mInfo
(
"vgId:1, has updating mnode:%d, status:%s"
,
pObj
->
id
,
sdbStatusName
(
objStatus
));
hasUpdatingMnode
=
true
;
}
if
(
objStatus
==
SDB_STATUS_READY
||
objStatus
==
SDB_STATUS_CREATING
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
cfg
.
replicaNum
];
tstrncpy
(
pNode
->
nodeFqdn
,
pObj
->
pDnode
->
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pObj
->
pDnode
->
port
;
if
(
pObj
->
pDnode
->
id
==
pMnode
->
selfDnodeId
)
{
cfg
.
myIndex
=
cfg
.
replicaNum
;
}
cfg
.
replicaNum
++
;
}
sdbReleaseLock
(
pSdb
,
pObj
,
false
);
}
if
(
cfg
.
myIndex
==
-
1
)
{
mInfo
(
"vgId:1, mnode not reload since selfIndex is -1"
);
return
;
}
if
(
!
mndGetRestored
(
pMnode
))
{
mInfo
(
"vgId:1, mnode not reload since restore not finished"
);
return
;
}
if
(
hasUpdatingMnode
)
{
mInfo
(
"vgId:1, start to reload mnode sync, replica:%d myIndex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
for
(
int32_t
i
=
0
;
i
<
cfg
.
replicaNum
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
mInfo
(
"vgId:1, index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
int32_t
code
=
syncReconfig
(
pMnode
->
syncMgmt
.
sync
,
&
cfg
);
if
(
code
!=
0
)
{
mError
(
"vgId:1, failed to reconfig mnode sync since %s"
,
terrstr
());
}
else
{
mInfo
(
"vgId:1, reconfig mnode sync success"
);
}
}
}
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
80170d3e
...
...
@@ -107,7 +107,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
if
(
!
pMnode
->
deploy
)
{
mInfo
(
"vgId:1, sync restore finished, and will handle outstanding transactions"
);
mndTransPullup
(
pMnode
);
mndSetRestore
(
pMnode
,
true
);
mndSetRestore
d
(
pMnode
,
true
);
}
else
{
mInfo
(
"vgId:1, sync restore finished"
);
}
...
...
@@ -225,18 +225,17 @@ int32_t mndInitSync(SMnode *pMnode) {
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s%ssync"
,
pMnode
->
path
,
TD_DIRSEP
);
syncInfo
.
pWal
=
pMnode
->
pWal
;
syncInfo
.
pFsm
=
mndSyncMakeFsm
(
pMnode
);
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
syncInfo
.
snapshotStrategy
=
SYNC_STRATEGY_STANDARD_SNAPSHOT
;
mInfo
(
"vgId:1, start to open sync, s
tandby:%d"
,
pMgmt
->
standby
);
if
(
pMgmt
->
standby
||
pMgmt
->
replica
.
id
>
0
)
{
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
1
;
pCfg
->
myIndex
=
0
;
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
0
];
tstrncpy
(
pNode
->
nodeFqdn
,
pMgmt
->
replica
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pMgmt
->
replica
.
port
;
mInfo
(
"vgId:1,
ep:%s:%u"
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
mInfo
(
"vgId:1, start to open sync, s
elfIndex:%d replica:%d"
,
pMgmt
->
selfIndex
,
pMgmt
->
numOfReplicas
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
pMgmt
->
numOfReplicas
;
pCfg
->
myIndex
=
pMgmt
->
selfIndex
;
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
numOfReplicas
;
++
i
)
{
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
i
];
tstrncpy
(
pNode
->
nodeFqdn
,
pMgmt
->
replica
s
[
i
]
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pMgmt
->
replica
s
[
i
]
.
port
;
mInfo
(
"vgId:1,
index:%d ep:%s:%u"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
...
...
@@ -250,10 +249,6 @@ int32_t mndInitSync(SMnode *pMnode) {
setPingTimerMS
(
pMgmt
->
sync
,
5000
);
setElectTimerMS
(
pMgmt
->
sync
,
3000
);
setHeartbeatTimerMS
(
pMgmt
->
sync
,
500
);
/*
setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300);
*/
mInfo
(
"mnode-sync is opened, id:%"
PRId64
,
pMgmt
->
sync
);
return
0
;
...
...
@@ -319,7 +314,7 @@ void mndSyncStart(SMnode *pMnode) {
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncSetMsgCb
(
pMgmt
->
sync
,
&
pMnode
->
msgCb
);
syncStart
(
pMgmt
->
sync
);
mInfo
(
"vgId:1, sync started, id:%"
PRId64
" standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
mInfo
(
"vgId:1, sync started, id:%"
PRId64
,
pMgmt
->
sync
);
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{
...
...
@@ -331,7 +326,7 @@ void mndSyncStop(SMnode *pMnode) {
taosWUnLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
}
bool
mndIs
Mast
er
(
SMnode
*
pMnode
)
{
bool
mndIs
Lead
er
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
if
(
!
syncIsReady
(
pMgmt
->
sync
))
{
...
...
@@ -340,7 +335,7 @@ bool mndIsMaster(SMnode *pMnode) {
return
false
;
}
if
(
!
pMnode
->
restored
)
{
if
(
!
mndGetRestored
(
pMnode
)
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
return
false
;
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
80170d3e
...
...
@@ -53,7 +53,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static
bool
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerfromFinishedStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndCannotExecuteTransAction
(
SMnode
*
pMnode
)
{
return
!
pMnode
->
deploy
&&
!
mndIs
Mast
er
(
pMnode
);
}
static
bool
mndCannotExecuteTransAction
(
SMnode
*
pMnode
)
{
return
!
pMnode
->
deploy
&&
!
mndIs
Lead
er
(
pMnode
);
}
static
void
mndTransSendRpcRsp
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndProcessTransTimer
(
SRpcMsg
*
pReq
);
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
80170d3e
...
...
@@ -298,6 +298,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
* @param pObj The object of the row.
*/
void
sdbRelease
(
SSdb
*
pSdb
,
void
*
pObj
);
void
sdbReleaseLock
(
SSdb
*
pSdb
,
void
*
pObj
,
bool
lock
);
/**
* @brief Traverse a sdb table
...
...
@@ -309,7 +310,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
* @return void* The next iterator of the table.
*/
void
*
sdbFetch
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
);
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
);
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
,
bool
lock
);
/**
* @brief Cancel a traversal
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
80170d3e
...
...
@@ -327,14 +327,16 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
taosThreadRwlockUnlock
(
pLock
);
}
void
sdbRelease
(
SSdb
*
pSdb
,
void
*
pObj
)
{
void
sdbRelease
Lock
(
SSdb
*
pSdb
,
void
*
pObj
,
bool
lock
)
{
if
(
pObj
==
NULL
)
return
;
SSdbRow
*
pRow
=
(
SSdbRow
*
)((
char
*
)
pObj
-
sizeof
(
SSdbRow
));
if
(
pRow
->
type
>=
SDB_MAX
)
return
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
];
taosThreadRwlockWrlock
(
pLock
);
if
(
lock
)
{
taosThreadRwlockWrlock
(
pLock
);
}
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
sdbPrintOper
(
pSdb
,
pRow
,
"release"
);
...
...
@@ -342,9 +344,13 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow
(
pSdb
,
pRow
,
true
);
}
taosThreadRwlockUnlock
(
pLock
);
if
(
lock
)
{
taosThreadRwlockUnlock
(
pLock
);
}
}
void
sdbRelease
(
SSdb
*
pSdb
,
void
*
pObj
)
{
sdbReleaseLock
(
pSdb
,
pObj
,
true
);
}
void
*
sdbFetch
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
)
{
*
ppObj
=
NULL
;
...
...
@@ -372,14 +378,16 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
return
ppRow
;
}
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
)
{
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
,
bool
lock
)
{
*
ppObj
=
NULL
;
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
NULL
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
if
(
lock
)
{
taosThreadRwlockRdlock
(
pLock
);
}
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
pIter
);
while
(
ppRow
!=
NULL
)
{
...
...
@@ -395,7 +403,9 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
*
status
=
pRow
->
status
;
break
;
}
taosThreadRwlockUnlock
(
pLock
);
if
(
lock
)
{
taosThreadRwlockUnlock
(
pLock
);
}
return
ppRow
;
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
80170d3e
...
...
@@ -169,7 +169,7 @@ typedef struct SSyncNode {
}
SSyncNode
;
// open/close --------------
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pSyncInfo
);
SSyncNode
*
syncNodeOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncNodeStart
(
SSyncNode
*
pSyncNode
);
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
80170d3e
...
...
@@ -86,10 +86,10 @@ void syncCleanUp() {
}
}
int64_t
syncOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"
failed to open sync node. vgId:%d"
,
pSyncInfo
->
vgId
);
sError
(
"
vgId:%d, failed to open sync node since %s"
,
pSyncInfo
->
vgId
,
terrstr
()
);
return
-
1
;
}
...
...
@@ -230,7 +230,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
return
ret
;
}
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
)
{
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
...
...
@@ -238,6 +238,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
#if 0
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
...
...
@@ -259,6 +260,12 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
#else
syncNodeUpdateNewConfigIndex
(
pSyncNode
,
pNewCfg
);
syncNodeDoConfigChange
(
pSyncNode
,
pNewCfg
,
SYNC_INDEX_INVALID
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
0
;
#endif
}
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
...
...
@@ -919,9 +926,7 @@ _END:
}
// open/close --------------
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pOldSyncInfo
)
{
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
SSyncNode
*
syncNodeOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSyncNode
));
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -957,7 +962,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
sError
(
"failed to open raft cfg file. path:%s"
,
pSyncNode
->
configPath
);
goto
_error
;
}
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
if
(
pSyncInfo
->
syncCfg
.
replicaNum
==
0
)
{
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
}
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
pSyncNode
->
pRaftCfg
=
NULL
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
80170d3e
...
...
@@ -155,7 +155,7 @@
./test.sh -f tsim/parser/select_with_tags.sim
./test.sh -f tsim/parser/selectResNum.sim
./test.sh -f tsim/parser/set_tag_vals.sim
./test.sh -f tsim/parser/single_row_in_tb.sim
# TD-19572
./test.sh -f tsim/parser/single_row_in_tb.sim
./test.sh -f tsim/parser/sliding.sim
./test.sh -f tsim/parser/slimit_alter_tags.sim
./test.sh -f tsim/parser/slimit.sim
...
...
@@ -194,7 +194,7 @@
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
# TD-17919
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
# ---- show ----
...
...
tests/script/tsim/mnode/basic1.sim
浏览文件 @
80170d3e
...
...
@@ -42,6 +42,7 @@ sql_error drop mnode on dnode 1
print =============== create mnode 2
sql create mnode on dnode 2
print =============== create mnode 2 finished
$x = 0
step2:
$x = $x + 1
...
...
@@ -69,9 +70,10 @@ if $data(2)[2] != follower then
goto step2
endi
sleep 2000
print ============ drop mnode 2
sql drop mnode on dnode 2
print ============ drop mnode 2 finished
sql select * from information_schema.ins_mnodes
if $rows != 1 then
return -1
...
...
@@ -109,6 +111,8 @@ sleep 2000
print =============== create mnodes
sql create mnode on dnode 2
print =============== create mnode 2 finished
sql select * from information_schema.ins_mnodes
if $rows != 2 then
return -1
...
...
tests/script/tsim/mnode/basic4.sim
浏览文件 @
80170d3e
...
...
@@ -45,49 +45,12 @@ endi
if $data(2)[4] != ready then
goto step2
endi
system sh/exec.sh -n dnode3 -s stop
sql_error create mnode on dnode 3
print =============== step3: select * from information_schema.ins_mnodes
$x = 0
step3:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql select * from information_schema.ins_mnodes -x step3
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4]
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4]
$leaderNum = 0
if $data(1)[2] == leader then
$leaderNum = 1
endi
if $data(2)[2] == leader then
$leaderNum = 1
endi
if $leaderNum == 0 then
goto step3
endi
if $data(3)[2] != offline then
goto step3
endi
if $data(1)[3] != ready then
goto step3
endi
if $data(2)[3] != ready then
goto step3
endi
if $data(3)[3] != creating then
goto step3
if $data(3)[4] != ready then
goto step2
endi
print =============== step4:
start dnode
3
s
ystem sh/exec.sh -n dnode3 -s start
print =============== step4:
create mnode
3
s
ql create mnode on dnode 3
$x = 0
step4:
...
...
@@ -159,12 +122,11 @@ endi
if $data(2)[3] != ready then
goto step5
endi
if $data(3)[3] != dropping then
goto step5
endi
print =============== step6: start dnode3
system sh/exec.sh -n dnode3 -s start
sql drop mnode on dnode 1 -x step60
step60:
$x = 0
step6:
...
...
tests/script/tsim/mnode/basic5.sim
浏览文件 @
80170d3e
...
...
@@ -232,12 +232,11 @@ endi
if $leaderNum != 1 then
goto step81
endi
if $data(1)[3] != dropping then
goto step81
endi
print =============== step9: start mnode1 and wait it dropped
system sh/exec.sh -n dnode1 -s start
sql drop mnode on dnode 1 -x step90
step90:
$x = 0
step91:
...
...
tests/script/tsim/valgrind/checkError3.sim
浏览文件 @
80170d3e
...
...
@@ -62,7 +62,16 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start -v
print =============== stepa: query data
sql select * from c1
$x = 0
steps:
$x = $x + 1
sleep 500
if $x == 50 then
return -1
endi
sql select * from c1 -x steps
sql select * from stb
sql select * from stb_1
sql select ts, c1, c2, c3 from c1
...
...
tests/script/tsim/valgrind/checkError6.sim
浏览文件 @
80170d3e
...
...
@@ -158,9 +158,15 @@ print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start -v
sleep 1000
$x = 0
steps:
$x = $x + 1
sleep 500
if $x == 50 then
return -1
endi
sql select avg(tbcol) as c from stb -x steps
sql select avg(tbcol) as c from stb
sql select avg(tbcol) as c from stb where ts <= 1601481840000
sql select avg(tbcol) as c from stb where tgcol < 5 and ts <= 1601481840000
sql select avg(tbcol) as c from stb interval(1m)
...
...
tests/system-test/fulltest.sh
浏览文件 @
80170d3e
...
...
@@ -232,7 +232,7 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop2Follower.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStopLoop.py
-N
5
-M
3
# unstable
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py
-N
5
-M
3
...
...
@@ -248,8 +248,8 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 -
python3 ./test.py
-f
6-cluster/5dnode3mnodeAdd1Ddnoe.py
-N
6
-M
3
-C
5
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
#
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
#
python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeDrop.py
-N
5
python3 test.py
-f
6-cluster/5dnode3mnodeStopConnect.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRecreateMnode.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStopFollowerLeader.py
-N
5
-M
3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录