Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce226517
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
ce226517
编写于
5月 25, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12966 from taosdata/fix/mnode
refactor: let all operations of mnode into the sync log
上级
a4e3a953
1100ec3f
变更
41
隐藏空白更改
内联
并排
Showing
41 changed file
with
532 addition
and
302 deletion
+532
-302
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+1
-9
include/libs/sync/sync.h
include/libs/sync/sync.h
+12
-2
include/util/tlog.h
include/util/tlog.h
+1
-0
source/common/src/systable.c
source/common/src/systable.c
+0
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-0
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+2
-1
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+0
-2
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+0
-16
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+4
-39
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+0
-3
source/dnode/mgmt/node_util/inc/dmUtil.h
source/dnode/mgmt/node_util/inc/dmUtil.h
+2
-2
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+29
-27
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+2
-2
source/dnode/mnode/impl/inc/mndMnode.h
source/dnode/mnode/impl/inc/mndMnode.h
+0
-1
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+50
-20
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+26
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+47
-9
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+87
-30
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+23
-30
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+3
-3
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+26
-0
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+26
-19
source/dnode/mnode/sdb/CMakeLists.txt
source/dnode/mnode/sdb/CMakeLists.txt
+1
-2
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+63
-35
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-1
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+1
-1
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+1
-1
source/dnode/mnode/sdb/src/sdbRow.c
source/dnode/mnode/sdb/src/sdbRow.c
+1
-1
source/libs/monitor/src/monMain.c
source/libs/monitor/src/monMain.c
+2
-1
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+10
-0
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+10
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+18
-2
source/libs/sync/test/syncTest.cpp
source/libs/sync/test/syncTest.cpp
+9
-1
source/os/src/osSocket.c
source/os/src/osSocket.c
+3
-3
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+3
-2
tests/script/tsim/dnode/basic1.sim
tests/script/tsim/dnode/basic1.sim
+2
-4
tests/script/tsim/mnode/basic2.sim
tests/script/tsim/mnode/basic2.sim
+55
-27
tests/script/tsim/trans/create_db.sim
tests/script/tsim/trans/create_db.sim
+3
-3
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
ce226517
...
...
@@ -949,6 +949,7 @@ typedef struct {
int32_t
numOfCores
;
int32_t
numOfSupportVnodes
;
char
dnodeEp
[
TSDB_EP_LEN
];
SMnodeLoad
mload
;
SClusterCfg
clusterCfg
;
SArray
*
pVloads
;
// array of SVnodeLoad
}
SStatusReq
;
...
...
include/dnode/mnode/mnode.h
浏览文件 @
ce226517
...
...
@@ -29,6 +29,7 @@ extern "C" {
typedef
struct
SMnode
SMnode
;
typedef
struct
{
int32_t
dnodeId
;
bool
standby
;
bool
deploy
;
int8_t
replica
;
...
...
@@ -54,15 +55,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption);
*/
void
mndClose
(
SMnode
*
pMnode
);
/**
* @brief Close a mnode.
*
* @param pMnode The mnode object to close.
* @param pOption Options of the mnode.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
mndAlter
(
SMnode
*
pMnode
,
const
SMnodeOpt
*
pOption
);
/**
* @brief Start mnode
*
...
...
include/libs/sync/sync.h
浏览文件 @
ce226517
...
...
@@ -98,8 +98,18 @@ typedef struct SSyncFSM {
void
(
*
FpRestoreFinishCb
)(
struct
SSyncFSM
*
pFsm
);
int32_t
(
*
FpGetSnapshot
)(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
void
*
(
*
FpSnapshotRead
)(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
snapshot
,
void
*
iter
,
char
**
ppBuf
,
int32_t
*
len
);
int32_t
(
*
FpSnapshotApply
)(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
snapshot
,
char
*
pBuf
,
int32_t
len
);
// if (*ppIter == NULL)
// *ppIter = new iter;
// else
// *ppIter.next();
//
// if success, return 0. else return error code
int32_t
(
*
FpSnapshotRead
)(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
pSnapshot
,
void
**
ppIter
,
char
**
ppBuf
,
int32_t
*
len
);
// apply data into fsm
int32_t
(
*
FpSnapshotApply
)(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
pSnapshot
,
char
*
pBuf
,
int32_t
len
);
void
(
*
FpReConfigCb
)(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCfg
,
SReConfigCbMeta
cbMeta
);
...
...
include/util/tlog.h
浏览文件 @
ce226517
...
...
@@ -88,6 +88,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", DEBUG_INFO, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", DEBUG_DEBUG, uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", DEBUG_TRACE, uDebugFlag, __VA_ARGS__); }}
#define uDebugL(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL ", DEBUG_DEBUG, uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
...
...
source/common/src/systable.c
浏览文件 @
ce226517
...
...
@@ -36,7 +36,6 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.
name
=
"id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"endpoint"
,
.
bytes
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"role"
,
.
bytes
=
12
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"role_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
};
...
...
source/common/src/tmsg.c
浏览文件 @
ce226517
...
...
@@ -891,6 +891,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if
(
tEncodeI64
(
&
encoder
,
pload
->
pointsWritten
)
<
0
)
return
-
1
;
}
// mnode loads
if
(
tEncodeI32
(
&
encoder
,
pReq
->
mload
.
syncState
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -946,6 +949,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
}
}
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
mload
.
syncState
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
...
...
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
ce226517
...
...
@@ -75,8 +75,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(
*
pMgmt
->
getVnodeLoadsFp
)(
&
vinfo
);
req
.
pVloads
=
vinfo
.
pVloads
;
SMonMloadInfo
minfo
=
{
0
};
SMonMloadInfo
minfo
=
{
0
};
(
*
pMgmt
->
getMnodeLoadsFp
)(
&
minfo
);
req
.
mload
=
minfo
.
load
;
int32_t
contLen
=
tSerializeSStatusReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
ce226517
...
...
@@ -36,7 +36,6 @@ typedef struct SMnodeMgmt {
SSingleWorker
monitorWorker
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
int8_t
replica
;
int8_t
selfIndex
;
bool
stopped
;
int32_t
refCount
;
TdThreadRwlock
lock
;
...
...
@@ -47,7 +46,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
SDCreateMnodeReq
*
pMsg
,
bool
deployed
);
// mmInt.c
int32_t
mmAlter
(
SMnodeMgmt
*
pMgmt
,
SDAlterMnodeReq
*
pMsg
);
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
);
void
mmRelease
(
SMnodeMgmt
*
pMgmt
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
ce226517
...
...
@@ -124,22 +124,6 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return
0
;
}
int32_t
mmProcessAlterReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
SDAlterMnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
pMgmt
->
pData
->
dnodeId
!=
0
&&
alterReq
.
dnodeId
!=
pMgmt
->
pData
->
dnodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s, input:%d cur:%d"
,
terrstr
(),
alterReq
.
dnodeId
,
pMgmt
->
pData
->
dnodeId
);
return
-
1
;
}
else
{
return
mmAlter
(
pMgmt
,
&
alterReq
);
}
}
SArray
*
mmGetMsgHandles
()
{
int32_t
code
=
-
1
;
SArray
*
pArray
=
taosArrayInit
(
64
,
sizeof
(
SMgmtHandle
));
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
ce226517
...
...
@@ -42,6 +42,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption
->
standby
=
false
;
pOption
->
deploy
=
true
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
pOption
->
replica
=
1
;
pOption
->
selfIndex
=
0
;
...
...
@@ -52,9 +54,10 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
}
static
void
mmBuildOptionForOpen
(
SMnodeMgmt
*
pMgmt
,
SMnodeOpt
*
pOption
)
{
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
deploy
=
false
;
pOption
->
standby
=
false
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
if
(
pMgmt
->
replica
>
0
)
{
pOption
->
standby
=
true
;
...
...
@@ -70,44 +73,6 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
}
}
static
int32_t
mmBuildOptionForAlter
(
SMnodeMgmt
*
pMgmt
,
SMnodeOpt
*
pOption
,
SDCreateMnodeReq
*
pCreate
)
{
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
standby
=
false
;
pOption
->
deploy
=
false
;
pOption
->
replica
=
pCreate
->
replica
;
pOption
->
selfIndex
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pCreate
->
replica
;
++
i
)
{
SReplica
*
pReplica
=
&
pOption
->
replicas
[
i
];
pReplica
->
id
=
pCreate
->
replicas
[
i
].
id
;
pReplica
->
port
=
pCreate
->
replicas
[
i
].
port
;
memcpy
(
pReplica
->
fqdn
,
pCreate
->
replicas
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
pReplica
->
id
==
pMgmt
->
pData
->
dnodeId
)
{
pOption
->
selfIndex
=
i
;
}
}
if
(
pOption
->
selfIndex
==
-
1
)
{
dError
(
"failed to build mnode options since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
int32_t
mmAlter
(
SMnodeMgmt
*
pMgmt
,
SDAlterMnodeReq
*
pMsg
)
{
SMnodeOpt
option
=
{
0
};
if
(
mmBuildOptionForAlter
(
pMgmt
,
&
option
,
pMsg
)
!=
0
)
{
return
-
1
;
}
if
(
mndAlter
(
pMgmt
->
pMnode
,
&
option
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
mmClose
(
SMnodeMgmt
*
pMgmt
)
{
if
(
pMgmt
->
pMnode
!=
NULL
)
{
mmStopWorker
(
pMgmt
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
ce226517
...
...
@@ -32,9 +32,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace
(
"msg:%p, get from mnode queue"
,
pMsg
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_DND_ALTER_MNODE
:
code
=
mmProcessAlterReq
(
pMgmt
,
pMsg
);
break
;
case
TDMT_MON_MM_INFO
:
code
=
mmProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
break
;
...
...
source/dnode/mgmt/node_util/inc/dmUtil.h
浏览文件 @
ce226517
...
...
@@ -90,8 +90,8 @@ typedef enum {
typedef
int32_t
(
*
ProcessCreateNodeFp
)(
EDndNodeType
ntype
,
SRpcMsg
*
pMsg
);
typedef
int32_t
(
*
ProcessDropNodeFp
)(
EDndNodeType
ntype
,
SRpcMsg
*
pMsg
);
typedef
void
(
*
SendMonitorReportFp
)();
typedef
void
(
*
GetVnodeLoadsFp
)();
typedef
void
(
*
GetMnodeLoadsFp
)();
typedef
void
(
*
GetVnodeLoadsFp
)(
SMonVloadInfo
*
pInfo
);
typedef
void
(
*
GetMnodeLoadsFp
)(
SMonMloadInfo
*
pInfo
);
typedef
struct
{
int32_t
dnodeId
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
ce226517
...
...
@@ -67,30 +67,33 @@ typedef enum {
typedef
enum
{
TRN_TYPE_BASIC_SCOPE
=
1000
,
TRN_TYPE_CREATE_USER
=
1001
,
TRN_TYPE_ALTER_USER
=
1002
,
TRN_TYPE_DROP_USER
=
1003
,
TRN_TYPE_CREATE_FUNC
=
1004
,
TRN_TYPE_DROP_FUNC
=
1005
,
TRN_TYPE_CREATE_SNODE
=
1006
,
TRN_TYPE_DROP_SNODE
=
1007
,
TRN_TYPE_CREATE_QNODE
=
1008
,
TRN_TYPE_DROP_QNODE
=
1009
,
TRN_TYPE_CREATE_BNODE
=
1010
,
TRN_TYPE_DROP_BNODE
=
1011
,
TRN_TYPE_CREATE_MNODE
=
1012
,
TRN_TYPE_DROP_MNODE
=
1013
,
TRN_TYPE_CREATE_TOPIC
=
1014
,
TRN_TYPE_DROP_TOPIC
=
1015
,
TRN_TYPE_SUBSCRIBE
=
1016
,
TRN_TYPE_REBALANCE
=
1017
,
TRN_TYPE_COMMIT_OFFSET
=
1018
,
TRN_TYPE_CREATE_STREAM
=
1019
,
TRN_TYPE_DROP_STREAM
=
1020
,
TRN_TYPE_ALTER_STREAM
=
1021
,
TRN_TYPE_CONSUMER_LOST
=
1022
,
TRN_TYPE_CONSUMER_RECOVER
=
1023
,
TRN_TYPE_CREATE_ACCT
=
1001
,
TRN_TYPE_CREATE_CLUSTER
=
1002
,
TRN_TYPE_CREATE_USER
=
1003
,
TRN_TYPE_ALTER_USER
=
1004
,
TRN_TYPE_DROP_USER
=
1005
,
TRN_TYPE_CREATE_FUNC
=
1006
,
TRN_TYPE_DROP_FUNC
=
1007
,
TRN_TYPE_CREATE_SNODE
=
1010
,
TRN_TYPE_DROP_SNODE
=
1011
,
TRN_TYPE_CREATE_QNODE
=
1012
,
TRN_TYPE_DROP_QNODE
=
10013
,
TRN_TYPE_CREATE_BNODE
=
1014
,
TRN_TYPE_DROP_BNODE
=
1015
,
TRN_TYPE_CREATE_MNODE
=
1016
,
TRN_TYPE_DROP_MNODE
=
1017
,
TRN_TYPE_CREATE_TOPIC
=
1020
,
TRN_TYPE_DROP_TOPIC
=
1021
,
TRN_TYPE_SUBSCRIBE
=
1022
,
TRN_TYPE_REBALANCE
=
1023
,
TRN_TYPE_COMMIT_OFFSET
=
1024
,
TRN_TYPE_CREATE_STREAM
=
1025
,
TRN_TYPE_DROP_STREAM
=
1026
,
TRN_TYPE_ALTER_STREAM
=
1027
,
TRN_TYPE_CONSUMER_LOST
=
1028
,
TRN_TYPE_CONSUMER_RECOVER
=
1029
,
TRN_TYPE_BASIC_SCOPE_END
,
TRN_TYPE_GLOBAL_SCOPE
=
2000
,
...
...
@@ -196,9 +199,8 @@ typedef struct {
int32_t
id
;
int64_t
createdTime
;
int64_t
updateTime
;
ESyncState
role
;
int32_t
roleTerm
;
int64_t
roleTime
;
ESyncState
state
;
int64_t
stateStartTime
;
SDnodeObj
*
pDnode
;
}
SMnodeObj
;
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
ce226517
...
...
@@ -78,7 +78,6 @@ typedef struct {
SWal
*
pWal
;
sem_t
syncSem
;
int64_t
sync
;
ESyncState
state
;
bool
standby
;
bool
restored
;
int32_t
errCode
;
...
...
@@ -90,9 +89,10 @@ typedef struct {
}
SGrantInfo
;
typedef
struct
SMnode
{
int32_t
selfId
;
int32_t
self
Dnode
Id
;
int64_t
clusterId
;
TdThread
thread
;
bool
deploy
;
bool
stopped
;
int8_t
replica
;
int8_t
selfIndex
;
...
...
source/dnode/mnode/impl/inc/mndMnode.h
浏览文件 @
ce226517
...
...
@@ -28,7 +28,6 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId);
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
pObj
);
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
void
mndGetMnodeEpSet
(
SMnode
*
pMnode
,
SEpSet
*
pEpSet
);
void
mndUpdateMnodeRole
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
ce226517
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndAcct.h"
#include "mndShow.h"
#include "mndTrans.h"
#define ACCT_VER_NUMBER 1
#define ACCT_RESERVE_SIZE 128
...
...
@@ -31,14 +32,16 @@ static int32_t mndProcessAlterAcctReq(SRpcMsg *pReq);
static
int32_t
mndProcessDropAcctReq
(
SRpcMsg
*
pReq
);
int32_t
mndInitAcct
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_ACCT
,
.
keyType
=
SDB_KEY_BINARY
,
.
deployFp
=
mndCreateDefaultAcct
,
.
encodeFp
=
(
SdbEncodeFp
)
mndAcctActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndAcctActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndAcctActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndAcctActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndAcctActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_ACCT
,
.
keyType
=
SDB_KEY_BINARY
,
.
deployFp
=
mndCreateDefaultAcct
,
.
encodeFp
=
(
SdbEncodeFp
)
mndAcctActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndAcctActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndAcctActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndAcctActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndAcctActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_ACCT
,
mndProcessCreateAcctReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_ACCT
,
mndProcessAlterAcctReq
);
...
...
@@ -56,25 +59,52 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
acctObj
.
updateTime
=
acctObj
.
createdTime
;
acctObj
.
acctId
=
1
;
acctObj
.
status
=
0
;
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
INT32_MAX
,
.
maxDbs
=
INT32_MAX
,
.
maxStbs
=
INT32_MAX
,
.
maxTbs
=
INT32_MAX
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxStreams
=
INT32_MAX
,
.
maxFuncs
=
INT32_MAX
,
.
maxConsumers
=
INT32_MAX
,
.
maxConns
=
INT32_MAX
,
.
maxTopics
=
INT32_MAX
,
.
maxStorage
=
INT64_MAX
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
acctObj
.
cfg
=
(
SAcctCfg
){
.
maxUsers
=
INT32_MAX
,
.
maxDbs
=
INT32_MAX
,
.
maxStbs
=
INT32_MAX
,
.
maxTbs
=
INT32_MAX
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxStreams
=
INT32_MAX
,
.
maxFuncs
=
INT32_MAX
,
.
maxConsumers
=
INT32_MAX
,
.
maxConns
=
INT32_MAX
,
.
maxTopics
=
INT32_MAX
,
.
maxStorage
=
INT64_MAX
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
,
};
SSdbRaw
*
pRaw
=
mndAcctActionEncode
(
&
acctObj
);
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"acct:%s, will be created while deploy sdb, raw:%p"
,
acctObj
.
acct
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_ACCT
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"acct:%s, failed to create since %s"
,
acctObj
.
acct
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create acct:%s"
,
pTrans
->
id
,
acctObj
.
acct
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to commit redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndAcctActionEncode
(
SAcctObj
*
pAcct
)
{
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
ce226517
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndCluster.h"
#include "mndShow.h"
#include "mndTrans.h"
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 64
...
...
@@ -177,7 +178,32 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"cluster:%"
PRId64
", will be created while deploy sdb, raw:%p"
,
clusterObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_CLUSTER
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"cluster:%"
PRId64
", failed to create since %s"
,
clusterObj
.
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create cluster:%"
PRId64
,
pTrans
->
id
,
clusterObj
.
id
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to commit redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndRetrieveClusters
(
SRpcMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
ce226517
...
...
@@ -58,14 +58,16 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static
void
mndCancelGetNextDnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_DNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
deployFp
=
(
SdbDeployFp
)
mndCreateDefaultDnode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndDnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndDnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndDnodeActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_DNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
deployFp
=
(
SdbDeployFp
)
mndCreateDefaultDnode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndDnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndDnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndDnodeActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_DNODE
,
mndProcessCreateDnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_DNODE
,
mndProcessDropDnodeReq
);
...
...
@@ -90,13 +92,40 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
dnodeObj
.
updateTime
=
dnodeObj
.
createdTime
;
dnodeObj
.
port
=
pMnode
->
replicas
[
0
].
port
;
memcpy
(
&
dnodeObj
.
fqdn
,
pMnode
->
replicas
[
0
].
fqdn
,
TSDB_FQDN_LEN
);
snprintf
(
dnodeObj
.
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
dnodeObj
.
fqdn
,
dnodeObj
.
port
);
SSdbRaw
*
pRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
mDebug
(
"dnode:%d, will be created while deploy sdb, raw:%p"
,
dnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
dnodeObj
.
ep
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create dnode:%s"
,
pTrans
->
id
,
dnodeObj
.
ep
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndDnodeActionEncode
(
SDnodeObj
*
pDnode
)
{
...
...
@@ -350,6 +379,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseVgroup
(
pMnode
,
pVgroup
);
}
SMnodeObj
*
pObj
=
mndAcquireMnode
(
pMnode
,
pDnode
->
id
);
if
(
pObj
!=
NULL
)
{
if
(
pObj
->
state
!=
statusReq
.
mload
.
syncState
)
{
pObj
->
state
=
statusReq
.
mload
.
syncState
;
pObj
->
stateStartTime
=
taosGetTimestampMs
();
}
mndReleaseMnode
(
pMnode
,
pObj
);
}
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
bool
dnodeChanged
=
(
statusReq
.
dnodeVer
!=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
));
...
...
@@ -701,7 +739,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pDnode
->
id
,
false
);
char
buf
[
tListLen
(
pDnode
->
ep
)
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
buf
,
pDnode
->
ep
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
STR_WITH_MAXSIZE_TO_VARSTR
(
buf
,
pDnode
->
ep
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
buf
,
false
);
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
ce226517
...
...
@@ -31,6 +31,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
pObj
);
static
int32_t
mndMnodeActionUpdate
(
SSdb
*
pSdb
,
SMnodeObj
*
pOld
,
SMnodeObj
*
pNew
);
static
int32_t
mndProcessCreateMnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessAlterMnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropMnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCreateMnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessAlterMnodeRsp
(
SRpcMsg
*
pRsp
);
...
...
@@ -51,6 +52,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_MNODE
,
mndProcessCreateMnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_ALTER_MNODE
,
mndProcessAlterMnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_MNODE
,
mndProcessDropMnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_MNODE_RSP
,
mndProcessCreateMnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_ALTER_MNODE_RSP
,
mndProcessAlterMnodeRsp
);
...
...
@@ -77,28 +79,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
sdbRelease
(
pMnode
->
pSdb
,
pObj
);
}
void
mndUpdateMnodeRole
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SMnodeObj
*
pObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pObj
);
if
(
pIter
==
NULL
)
break
;
ESyncState
lastRole
=
pObj
->
role
;
if
(
pObj
->
id
==
1
)
{
pObj
->
role
=
TAOS_SYNC_STATE_LEADER
;
}
else
{
pObj
->
role
=
TAOS_SYNC_STATE_CANDIDATE
;
}
if
(
pObj
->
role
!=
lastRole
)
{
pObj
->
roleTime
=
taosGetTimestampMs
();
}
sdbRelease
(
pSdb
,
pObj
);
}
}
static
int32_t
mndCreateDefaultMnode
(
SMnode
*
pMnode
)
{
SMnodeObj
mnodeObj
=
{
0
};
mnodeObj
.
id
=
1
;
...
...
@@ -110,7 +90,33 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"mnode:%d, will be created while deploy sdb, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to create since %s"
,
mnodeObj
.
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
mnodeObj
.
id
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pObj
)
{
...
...
@@ -183,7 +189,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
return
-
1
;
}
pObj
->
role
=
TAOS_SYNC_STATE_FOLLOWE
R
;
pObj
->
state
=
TAOS_SYNC_STATE_ERRO
R
;
return
0
;
}
...
...
@@ -227,7 +233,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if
(
pObj
->
pDnode
==
NULL
)
{
mError
(
"mnode:%d, no corresponding dnode exists"
,
pObj
->
id
);
}
else
{
if
(
pObj
->
rol
e
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pObj
->
stat
e
==
TAOS_SYNC_STATE_LEADER
)
{
pEpSet
->
inUse
=
pEpSet
->
numOfEps
;
}
addEpIntoEpSet
(
pEpSet
,
pObj
->
pDnode
->
fqdn
,
pObj
->
pDnode
->
port
);
...
...
@@ -555,7 +561,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
}
if
(
pMnode
->
selfId
==
dropReq
.
dnodeId
)
{
if
(
pMnode
->
self
Dnode
Id
==
dropReq
.
dnodeId
)
{
terrno
=
TSDB_CODE_MND_CANT_DROP_MASTER
;
goto
_OVER
;
}
...
...
@@ -626,16 +632,18 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
b1
,
false
);
const
char
*
roles
=
syncStr
(
pObj
->
role
);
char
*
b2
=
taosMemoryCalloc
(
1
,
12
+
VARSTR_HEADER_SIZE
);
const
char
*
roles
=
NULL
;
if
(
pObj
->
id
==
pMnode
->
selfDnodeId
)
{
roles
=
syncStr
(
TAOS_SYNC_STATE_LEADER
);
}
else
{
roles
=
syncStr
(
pObj
->
state
);
}
char
*
b2
=
taosMemoryCalloc
(
1
,
12
+
VARSTR_HEADER_SIZE
);
STR_WITH_MAXSIZE_TO_VARSTR
(
b2
,
roles
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
b2
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pObj
->
roleTime
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pObj
->
createdTime
,
false
);
...
...
@@ -652,3 +660,52 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
int32_t
mndProcessAlterMnodeReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SDAlterMnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
alterReq
.
dnodeId
!=
pMnode
->
selfDnodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
mError
(
"failed to alter mnode since %s, input:%d cur:%d"
,
terrstr
(),
alterReq
.
dnodeId
,
pMnode
->
selfDnodeId
);
return
-
1
;
}
SSyncCfg
cfg
=
{.
replicaNum
=
alterReq
.
replica
,
.
myIndex
=
-
1
};
for
(
int32_t
i
=
0
;
i
<
alterReq
.
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
tstrncpy
(
pNode
->
nodeFqdn
,
alterReq
.
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
alterReq
.
replicas
[
i
].
port
;
if
(
alterReq
.
replicas
[
i
].
id
==
pMnode
->
selfDnodeId
)
cfg
.
myIndex
=
i
;
}
if
(
cfg
.
myIndex
==
-
1
)
{
mError
(
"failed to alter mnode since myindex is -1"
);
return
-
1
;
}
else
{
mInfo
(
"start to alter mnode sync, replica:%d myindex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
for
(
int32_t
i
=
0
;
i
<
alterReq
.
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
mInfo
(
"index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
}
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
standby
=
0
;
int32_t
code
=
syncReconfig
(
pMgmt
->
sync
,
&
cfg
);
if
(
code
!=
0
)
{
mError
(
"failed to alter mnode sync since %s"
,
terrstr
());
return
code
;
}
else
{
pMgmt
->
errCode
=
0
;
tsem_wait
(
&
pMgmt
->
syncSem
);
mInfo
(
"alter mnode sync result:%s"
,
tstrerror
(
pMgmt
->
errCode
));
terrno
=
pMgmt
->
errCode
;
return
pMgmt
->
errCode
;
}
}
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
ce226517
...
...
@@ -49,29 +49,38 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void
mndRestoreFinish
(
struct
SSyncFSM
*
pFsm
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
mndTransPullup
(
pMnode
);
pMnode
->
syncMgmt
.
restored
=
true
;
if
(
!
pMnode
->
deploy
)
{
mndTransPullup
(
pMnode
);
pMnode
->
syncMgmt
.
restored
=
true
;
}
}
void
*
mndSnapshotRead
(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
snapshot
,
void
*
iter
,
char
**
ppBuf
,
int32_t
*
len
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SSdbIter
*
pIter
=
iter
;
if
(
iter
==
NULL
)
{
pIter
=
sdbIterInit
(
pMnode
->
pSdb
);
int32_t
mndSnapshotRead
(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
pSnapshot
,
void
**
ppIter
,
char
**
ppBuf
,
int32_t
*
len
)
{
/*
SMnode *pMnode = pFsm->data;
SSdbIter *pIter;
if (iter == NULL) {
pIter = sdbIterInit(pMnode->sdb)
} else {
pIter = iter;
}
*/
return
sdbIterRead
(
pMnode
->
pSdb
,
pIter
,
ppBuf
,
len
)
;
return
0
;
}
int32_t
mndSnapshotApply
(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
s
napshot
,
char
*
pBuf
,
int32_t
len
)
{
int32_t
mndSnapshotApply
(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
pS
napshot
,
char
*
pBuf
,
int32_t
len
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
sdbWrite
(
pMnode
->
pSdb
,
(
SSdbRaw
*
)
pBuf
);
return
0
;
}
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCfg
,
SReConfigCbMeta
cbMeta
)
{
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCfg
,
SReConfigCbMeta
cbMeta
)
{
mInfo
(
"mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%"
PRId64
", cbMeta.term:%"
PRId64
", cbMeta.index:%"
PRId64
,
cbMeta
.
code
,
cbMeta
.
currentTerm
,
cbMeta
.
term
,
cbMeta
.
index
);
SMnode
*
pMnode
=
pFsm
->
data
;
pMnode
->
syncMgmt
.
errCode
=
cbMeta
.
code
;
tsem_post
(
&
pMnode
->
syncMgmt
.
syncSem
);
}
SSyncFSM
*
mndSyncMakeFsm
(
SMnode
*
pMnode
)
{
...
...
@@ -194,22 +203,6 @@ void mndSyncStop(SMnode *pMnode) {}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
state
=
syncGetMyRole
(
pMgmt
->
sync
);
return
(
pMgmt
->
state
==
TAOS_SYNC_STATE_LEADER
)
&&
(
pMnode
->
syncMgmt
.
restored
);
ESyncState
state
=
syncGetMyRole
(
pMgmt
->
sync
);
return
(
state
==
TAOS_SYNC_STATE_LEADER
)
&&
(
pMnode
->
syncMgmt
.
restored
);
}
int32_t
mndAlter
(
SMnode
*
pMnode
,
const
SMnodeOpt
*
pOption
)
{
SSyncCfg
cfg
=
{.
replicaNum
=
pOption
->
replica
,
.
myIndex
=
pOption
->
selfIndex
};
mInfo
(
"start to alter mnode sync, replica:%d myindex:%d standby:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
,
pOption
->
standby
);
for
(
int32_t
i
=
0
;
i
<
pOption
->
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
tstrncpy
(
pNode
->
nodeFqdn
,
pOption
->
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pOption
->
replicas
[
i
].
port
;
mInfo
(
"index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
standby
=
pOption
->
standby
;
return
syncReconfig
(
pMgmt
->
sync
,
&
cfg
);
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
ce226517
...
...
@@ -563,7 +563,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans
->
policy
=
policy
;
pTrans
->
type
=
type
;
pTrans
->
createdTime
=
taosGetTimestampMs
();
pTrans
->
rpcInfo
=
pReq
->
info
;
if
(
pReq
!=
NULL
)
pTrans
->
rpcInfo
=
pReq
->
info
;
pTrans
->
redoLogs
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
...
...
@@ -1080,7 +1080,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
}
static
bool
mndTransPerformRedoActionStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
!
mndIsMaster
(
pMnode
))
return
false
;
if
(
!
pMnode
->
deploy
&&
!
mndIsMaster
(
pMnode
))
return
false
;
bool
continueExec
=
true
;
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
...
...
@@ -1171,7 +1171,7 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
}
static
bool
mndTransPerformUndoActionStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
!
mndIsMaster
(
pMnode
))
return
false
;
if
(
!
pMnode
->
deploy
&&
!
mndIsMaster
(
pMnode
))
return
false
;
bool
continueExec
=
true
;
int32_t
code
=
mndTransExecuteUndoActions
(
pMnode
,
pTrans
);
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
ce226517
...
...
@@ -78,7 +78,33 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"user:%s, will be created while deploy sdb, raw:%p"
,
userObj
.
user
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_USER
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
userObj
.
user
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create user:%s"
,
pTrans
->
id
,
userObj
.
user
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to commit redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndCreateDefaultUsers
(
SMnode
*
pMnode
)
{
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
ce226517
...
...
@@ -153,8 +153,14 @@ static int32_t mndInitSdb(SMnode *pMnode) {
return
0
;
}
static
int32_t
mndDeploySdb
(
SMnode
*
pMnode
)
{
return
sdbDeploy
(
pMnode
->
pSdb
);
}
static
int32_t
mndReadSdb
(
SMnode
*
pMnode
)
{
return
sdbReadFile
(
pMnode
->
pSdb
);
}
static
int32_t
mndOpenSdb
(
SMnode
*
pMnode
)
{
if
(
!
pMnode
->
deploy
)
{
return
sdbReadFile
(
pMnode
->
pSdb
);
}
else
{
// return sdbDeploy(pMnode->pSdb);;
return
0
;
}
}
static
void
mndCleanupSdb
(
SMnode
*
pMnode
)
{
if
(
pMnode
->
pSdb
)
{
...
...
@@ -176,7 +182,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
return
0
;
}
static
int32_t
mndInitSteps
(
SMnode
*
pMnode
,
bool
deploy
)
{
static
int32_t
mndInitSteps
(
SMnode
*
pMnode
)
{
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb"
,
mndInitSdb
,
mndCleanupSdb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-trans"
,
mndInitTrans
,
mndCleanupTrans
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-cluster"
,
mndInitCluster
,
mndCleanupCluster
)
!=
0
)
return
-
1
;
...
...
@@ -201,11 +207,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
if
(
mndAllocStep
(
pMnode
,
"mnode-perfs"
,
mndInitPerfs
,
mndCleanupPerfs
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
if
(
deploy
)
{
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-deploy"
,
mndDeploySdb
,
NULL
)
!=
0
)
return
-
1
;
}
else
{
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-read"
,
mndReadSdb
,
NULL
)
!=
0
)
return
-
1
;
}
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb"
,
mndOpenSdb
,
NULL
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-profile"
,
mndInitProfile
,
mndCleanupProfile
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-show"
,
mndInitShow
,
mndCleanupShow
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-query"
,
mndInitQuery
,
mndCleanupQuery
)
!=
0
)
return
-
1
;
...
...
@@ -262,7 +264,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode
->
selfIndex
=
pOption
->
selfIndex
;
memcpy
(
&
pMnode
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
pMnode
->
msgCb
=
pOption
->
msgCb
;
pMnode
->
self
Id
=
pOption
->
replicas
[
pOption
->
selfIndex
].
i
d
;
pMnode
->
self
DnodeId
=
pOption
->
dnodeI
d
;
pMnode
->
syncMgmt
.
standby
=
pOption
->
standby
;
}
...
...
@@ -280,6 +282,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
(
void
)
taosParseTime
(
timestr
,
&
pMnode
->
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
mndSetOptions
(
pMnode
,
pOption
);
pMnode
->
deploy
=
pOption
->
deploy
;
pMnode
->
pSteps
=
taosArrayInit
(
24
,
sizeof
(
SMnodeStep
));
if
(
pMnode
->
pSteps
==
NULL
)
{
taosMemoryFree
(
pMnode
);
...
...
@@ -297,7 +300,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return
NULL
;
}
code
=
mndInitSteps
(
pMnode
,
pOption
->
deploy
);
code
=
mndInitSteps
(
pMnode
);
if
(
code
!=
0
)
{
code
=
terrno
;
mError
(
"failed to open mnode since %s"
,
terrstr
());
...
...
@@ -315,7 +318,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return
NULL
;
}
mndUpdateMnodeRole
(
pMnode
);
mDebug
(
"mnode open successfully "
);
return
pMnode
;
}
...
...
@@ -332,6 +334,10 @@ void mndClose(SMnode *pMnode) {
int32_t
mndStart
(
SMnode
*
pMnode
)
{
mndSyncStart
(
pMnode
);
if
(
pMnode
->
deploy
)
{
if
(
sdbDeploy
(
pMnode
->
pSdb
)
!=
0
)
return
-
1
;
pMnode
->
syncMgmt
.
restored
=
true
;
}
return
mndInitTimer
(
pMnode
);
}
...
...
@@ -408,8 +414,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
mTrace
(
"msg:%p, will be processed, type:%s app:%p"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
ahandle
);
if
(
IsReq
(
pMsg
))
{
if
(
!
mndIsMaster
(
pMnode
)
&&
pMsg
->
msgType
!=
TDMT_MND_TRANS_TIMER
&&
pMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
&&
pMsg
->
msgType
!=
TDMT_MND_TELEM_TIMER
)
{
if
(
!
mndIsMaster
(
pMnode
))
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
mDebug
(
"msg:%p, failed to process since %s, app:%p"
,
pMsg
,
terrstr
(),
ahandle
);
return
-
1
;
...
...
@@ -513,15 +518,17 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonMnodeDesc
desc
=
{
0
};
desc
.
mnode_id
=
pObj
->
id
;
tstrncpy
(
desc
.
mnode_ep
,
pObj
->
pDnode
->
ep
,
sizeof
(
desc
.
mnode_ep
));
tstrncpy
(
desc
.
role
,
syncStr
(
pObj
->
role
),
sizeof
(
desc
.
role
));
taosArrayPush
(
pClusterInfo
->
mnodes
,
&
desc
);
sdbRelease
(
pSdb
,
pObj
);
if
(
pObj
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pObj
->
id
==
pMnode
->
selfDnodeId
)
{
pClusterInfo
->
first_ep_dnode_id
=
pObj
->
id
;
tstrncpy
(
pClusterInfo
->
first_ep
,
pObj
->
pDnode
->
ep
,
sizeof
(
pClusterInfo
->
first_ep
));
pClusterInfo
->
master_uptime
=
(
ms
-
pObj
->
roleTime
)
/
(
86400000
.
0
f
);
pClusterInfo
->
master_uptime
=
(
ms
-
pObj
->
stateStartTime
)
/
(
86400000
.
0
f
);
tstrncpy
(
desc
.
role
,
syncStr
(
TAOS_SYNC_STATE_LEADER
),
sizeof
(
desc
.
role
));
}
else
{
tstrncpy
(
desc
.
role
,
syncStr
(
pObj
->
state
),
sizeof
(
desc
.
role
));
}
taosArrayPush
(
pClusterInfo
->
mnodes
,
&
desc
);
sdbRelease
(
pSdb
,
pObj
);
}
// vgroup info
...
...
@@ -574,6 +581,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
}
int32_t
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
)
{
pLoad
->
syncState
=
pMnode
->
syncMgmt
.
state
;
pLoad
->
syncState
=
syncGetMyRole
(
pMnode
->
syncMgmt
.
sync
)
;
return
0
;
}
source/dnode/mnode/sdb/CMakeLists.txt
浏览文件 @
ce226517
...
...
@@ -2,8 +2,7 @@ aux_source_directory(src MNODE_SRC)
add_library
(
sdb STATIC
${
MNODE_SRC
}
)
target_include_directories
(
sdb
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/dnode/mnode/sdb"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PUBLIC
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
sdb os common util wal
...
...
include/dnode/mnode/sdb
/sdb.h
→
source/dnode/mnode/sdb/inc
/sdb.h
浏览文件 @
ce226517
...
...
@@ -27,6 +27,15 @@
extern
"C"
{
#endif
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
{ \
if (func(pRaw, dataPos, val) != 0) { \
...
...
@@ -65,7 +74,7 @@ extern "C" {
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos)
SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
{ \
...
...
@@ -89,8 +98,16 @@ extern "C" {
}
typedef
struct
SMnode
SMnode
;
typedef
struct
SSdb
SSdb
;
typedef
struct
SSdbRaw
SSdbRaw
;
typedef
struct
SSdbRow
SSdbRow
;
typedef
int32_t
(
*
SdbInsertFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
SSdb
*
pSdb
,
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
,
bool
callFunc
);
typedef
int32_t
(
*
SdbDeployFp
)(
SMnode
*
pMnode
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
bool
(
*
sdbTraverseFp
)(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
);
typedef
enum
{
SDB_KEY_BINARY
=
1
,
...
...
@@ -130,14 +147,47 @@ typedef enum {
SDB_MAX
=
20
}
ESdbType
;
typedef
struct
SSdb
SSdb
;
typedef
int32_t
(
*
SdbInsertFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
SSdb
*
pSdb
,
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
,
bool
callFunc
);
typedef
int32_t
(
*
SdbDeployFp
)(
SMnode
*
pMnode
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
bool
(
*
sdbTraverseFp
)(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
);
typedef
struct
SSdbRaw
{
int8_t
type
;
int8_t
status
;
int8_t
sver
;
int8_t
reserved
;
int32_t
dataLen
;
char
pData
[];
}
SSdbRaw
;
typedef
struct
SSdbRow
{
ESdbType
type
;
ESdbStatus
status
;
int32_t
refCount
;
char
pObj
[];
}
SSdbRow
;
typedef
struct
SSdb
{
SMnode
*
pMnode
;
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
int64_t
curTerm
;
int64_t
tableVer
[
SDB_MAX
];
int64_t
maxId
[
SDB_MAX
];
EKeyType
keyTypes
[
SDB_MAX
];
SHashObj
*
hashObjs
[
SDB_MAX
];
TdThreadRwlock
locks
[
SDB_MAX
];
SdbInsertFp
insertFps
[
SDB_MAX
];
SdbUpdateFp
updateFps
[
SDB_MAX
];
SdbDeleteFp
deleteFps
[
SDB_MAX
];
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdb
;
typedef
struct
SSdbIter
{
TdFilePtr
file
;
int64_t
readlen
;
}
SSdbIter
;
typedef
struct
{
ESdbType
sdbType
;
...
...
@@ -328,36 +378,14 @@ int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow
*
sdbAllocRow
(
int32_t
objSize
);
void
*
sdbGetRowObj
(
SSdbRow
*
pRow
);
typedef
struct
SSdb
{
SMnode
*
pMnode
;
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
int64_t
curTerm
;
int64_t
tableVer
[
SDB_MAX
];
int64_t
maxId
[
SDB_MAX
];
EKeyType
keyTypes
[
SDB_MAX
];
SHashObj
*
hashObjs
[
SDB_MAX
];
TdThreadRwlock
locks
[
SDB_MAX
];
SdbInsertFp
insertFps
[
SDB_MAX
];
SdbUpdateFp
updateFps
[
SDB_MAX
];
SdbDeleteFp
deleteFps
[
SDB_MAX
];
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdb
;
typedef
struct
SSdbIter
{
TdFilePtr
file
;
int64_t
readlen
;
}
SSdbIter
;
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
bool
callFunc
);
SSdbIter
*
sdbIterInit
(
SSdb
*
pSdb
);
SSdbIter
*
sdbIterRead
(
SSdb
*
pSdb
,
SSdbIter
*
iter
,
char
**
ppBuf
,
int32_t
*
len
);
const
char
*
sdbTableName
(
ESdbType
type
);
void
sdbPrintOper
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
const
char
*
oper
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
ce226517
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "sdb
Int
.h"
#include "sdb.h"
static
int32_t
sdbCreateDir
(
SSdb
*
pSdb
);
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
ce226517
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "sdb
Int
.h"
#include "sdb.h"
#include "tchecksum.h"
#include "wal.h"
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
ce226517
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "sdb
Int
.h"
#include "sdb.h"
static
void
sdbCheckRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
);
...
...
source/dnode/mnode/sdb/src/sdbRaw.c
浏览文件 @
ce226517
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "sdb
Int
.h"
#include "sdb.h"
SSdbRaw
*
sdbAllocRaw
(
ESdbType
type
,
int8_t
sver
,
int32_t
dataLen
)
{
SSdbRaw
*
pRaw
=
taosMemoryCalloc
(
1
,
dataLen
+
sizeof
(
SSdbRaw
));
...
...
source/dnode/mnode/sdb/src/sdbRow.c
浏览文件 @
ce226517
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "sdb
Int
.h"
#include "sdb.h"
SSdbRow
*
sdbAllocRow
(
int32_t
objSize
)
{
SSdbRow
*
pRow
=
taosMemoryCalloc
(
1
,
objSize
+
sizeof
(
SSdbRow
));
...
...
source/libs/monitor/src/monMain.c
浏览文件 @
ce226517
...
...
@@ -530,7 +530,8 @@ void monSendReport() {
monGenLogJson
(
pMonitor
);
char
*
pCont
=
tjsonToString
(
pMonitor
->
pJson
);
if
(
pCont
!=
NULL
)
{
// uDebugL("report cont:%s\n", pCont);
if
(
pCont
!=
NULL
)
{
EHttpCompFlag
flag
=
tsMonitor
.
cfg
.
comp
?
HTTP_GZIP
:
HTTP_FLAT
;
if
(
taosSendHttpReport
(
tsMonitor
.
cfg
.
server
,
tsMonitor
.
cfg
.
port
,
pCont
,
strlen
(
pCont
),
flag
)
!=
0
)
{
uError
(
"failed to send monitor msg"
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
ce226517
...
...
@@ -357,6 +357,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
else
{
syncNodeBecomeFollower
(
ths
);
}
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
if
(
ths
->
pFsm
->
FpReConfigCb
!=
NULL
)
{
SReConfigCbMeta
cbMeta
=
{
0
};
cbMeta
.
code
=
0
;
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
term
=
pEntry
->
term
;
ths
->
pFsm
->
FpReConfigCb
(
ths
->
pFsm
,
newSyncCfg
,
cbMeta
);
}
}
// restore finish
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
ce226517
...
...
@@ -134,6 +134,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
else
{
syncNodeBecomeFollower
(
pSyncNode
);
}
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
if
(
pSyncNode
->
pFsm
->
FpReConfigCb
!=
NULL
)
{
SReConfigCbMeta
cbMeta
=
{
0
};
cbMeta
.
code
=
0
;
cbMeta
.
currentTerm
=
pSyncNode
->
pRaftStore
->
currentTerm
;
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
term
=
pEntry
->
term
;
pSyncNode
->
pFsm
->
FpReConfigCb
(
pSyncNode
->
pFsm
,
newSyncCfg
,
cbMeta
);
}
}
// restore finish
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
ce226517
...
...
@@ -349,7 +349,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
}
// open/close --------------
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pOldSyncInfo
)
{
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
...
...
@@ -361,11 +363,25 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
sError
(
"failed to create dir:%s since %s"
,
pSyncInfo
->
path
,
terrstr
());
return
NULL
;
}
}
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s/raft_config.json"
,
pSyncInfo
->
path
);
if
(
!
taosCheckExistFile
(
pSyncNode
->
configPath
))
{
// create raft config file
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s/raft_config.json"
,
pSyncInfo
->
path
);
ret
=
syncCfgCreateFile
((
SSyncCfg
*
)
&
(
pSyncInfo
->
syncCfg
),
pSyncNode
->
configPath
);
assert
(
ret
==
0
);
}
else
{
// update syncCfg by raft_config.json
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
assert
(
pSyncNode
->
pRaftCfg
!=
NULL
);
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
char
*
seralized
=
raftCfg2Str
(
pSyncNode
->
pRaftCfg
);
sInfo
(
"syncNodeOpen update config :%s"
,
seralized
);
taosMemoryFree
(
seralized
);
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
}
// init by SSyncInfo
...
...
source/libs/sync/test/syncTest.cpp
浏览文件 @
ce226517
...
...
@@ -49,7 +49,7 @@ void test4() {
logTest
((
char
*
)
__FUNCTION__
);
}
int
main
()
{
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog("tmp/syncTest.log", 100);
tsAsyncLog
=
0
;
...
...
@@ -58,6 +58,14 @@ int main() {
test3
();
test4
();
if
(
argc
==
2
)
{
bool
bTaosDirExist
=
taosDirExist
(
argv
[
1
]);
printf
(
"%s bTaosDirExist:%d
\n
"
,
argv
[
1
],
bTaosDirExist
);
bool
bTaosCheckExistFile
=
taosCheckExistFile
(
argv
[
1
]);
printf
(
"%s bTaosCheckExistFile:%d
\n
"
,
argv
[
1
],
bTaosCheckExistFile
);
}
// taosCloseLog();
return
0
;
}
source/os/src/osSocket.c
浏览文件 @
ce226517
...
...
@@ -913,12 +913,12 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
}
else
{
#ifdef EAI_SYSTEM
if
(
ret
==
EAI_SYSTEM
)
{
printf
(
"failed to get the ip address, fqdn:%s, errno:%d, since:%s"
,
fqdn
,
errno
,
strerror
(
errno
));
//
printf("failed to get the ip address, fqdn:%s, errno:%d, since:%s", fqdn, errno, strerror(errno));
}
else
{
printf
(
"failed to get the ip address, fqdn:%s, ret:%d, since:%s"
,
fqdn
,
ret
,
gai_strerror
(
ret
));
//
printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
}
#else
printf
(
"failed to get the ip address, fqdn:%s, ret:%d, since:%s"
,
fqdn
,
ret
,
gai_strerror
(
ret
));
//
printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
#endif
return
0xFFFFFFFF
;
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
ce226517
...
...
@@ -56,7 +56,7 @@
# ---- mnode
#./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
#
./test.sh -f tsim/mnode/basic2.sim
# ---- show
./test.sh -f tsim/show/basic.sim
...
...
@@ -97,7 +97,8 @@
./test.sh -f tsim/stable/values.sim
./test.sh -f tsim/stable/vnode3.sim
./test.sh -f tsim/stable/column_add.sim
./test.sh -f tsim/stable/column_drop.sim
#./test.sh -f tsim/stable/column_drop.sim
#./test.sh -f tsim/stable/column_modify.sim
# --- for multi process mode
...
...
tests/script/tsim/dnode/basic1.sim
浏览文件 @
ce226517
...
...
@@ -7,6 +7,7 @@ sql connect
print =============== show dnodes
sql show dnodes;
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
return -1
endi
...
...
@@ -15,12 +16,9 @@ if $data00 != 1 then
return -1
endi
# check 'vnodes' feild ?
#if $data02 != 0 then
# return -1
#endi
sql show mnodes;
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
return -1
endi
...
...
tests/script/tsim/mnode/basic2.sim
浏览文件 @
ce226517
...
...
@@ -6,15 +6,6 @@ system sh/exec.sh -n dnode2 -s start
sql connect
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
...
...
@@ -30,55 +21,92 @@ endi
print =============== create dnodes
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sleep 2000
sql show dnodes;
if $rows !=
2
then
if $rows !=
3
then
return -1
endi
if $data00 != 1 then
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data
10 != 2
then
if $data
00 != 1
then
return -1
endi
print $data02
if $data02 != 0 then
if $data02 != LEADER then
return -1
endi
if $data12 != 0 then
print =============== create mnode 2
sql create mnode on dnode 2
sql show mnodes
print $data(1)[0] $data(1)[1] $data(1)[2]
print $data(2)[0] $data(2)[1] $data(2)[2]
if $rows != 2 then
return -1
endi
if $data04 != ready then
if $data(1)[0] != 1 then
return -1
endi
if $data14 != ready then
if $data(1)[2] != LEADER then
return -1
endi
sql show mnodes;
if $rows != 1 then
if $data(2)[0] != 2 then
return -1
endi
if $data00 != 1 then
if $data(2)[2] == LEADER then
return -1
endi
if $data02 != LEADER then
print =============== create user
sql create user user1 PASS 'user1'
sql show users
if $rows != 2 then
return -1
endi
print =============== create mnode 2
sql create mnode on dnode 2
#sql create database db
#sql show databases
#if $rows != 3 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
sleep 100
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
sql show mnodes
if $rows != 2 then
return -1
endi
if $data(1)[0] != 1 then
return -1
endi
if $data(1)[2] != LEADER then
return -1
endi
sql show users
if $rows != 2 then
return -1
endi
#sql show databases
#if $rows != 3 then
# return -1
#endi
return
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
\ No newline at end of file
tests/script/tsim/trans/create_db.sim
浏览文件 @
ce226517
...
...
@@ -64,7 +64,7 @@ if $rows != 1 then
return -1
endi
if $data[0][0] !=
2
then
if $data[0][0] !=
7
then
return -1
endi
...
...
@@ -114,7 +114,7 @@ if $rows != 1 then
return -1
endi
if $data[0][0] !=
4
then
if $data[0][0] !=
9
then
return -1
endi
...
...
@@ -137,7 +137,7 @@ endi
sql_error create database d2 vgroups 2;
print =============== kill transaction
sql kill transaction
4
;
sql kill transaction
9
;
sleep 2000
sql show transactions
...
...
tests/test/c/sdbDump.c
浏览文件 @
ce226517
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "mndInt.h"
#include "sdb
Int
.h"
#include "sdb.h"
#include "tconfig.h"
#include "tjson.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录