Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0af376f7
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
0af376f7
编写于
4月 15, 2020
作者:
S
slguan
提交者:
GitHub
4月 15, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1614 from taosdata/feature/mpeer
Feature/mpeer
上级
fecbe801
98286acc
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
314 addition
and
325 deletion
+314
-325
src/dnode/CMakeLists.txt
src/dnode/CMakeLists.txt
+4
-0
src/dnode/inc/dnodeMClient.h
src/dnode/inc/dnodeMClient.h
+1
-0
src/dnode/inc/dnodeMgmt.h
src/dnode/inc/dnodeMgmt.h
+0
-1
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+143
-14
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+1
-2
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+0
-117
src/inc/mnode.h
src/inc/mnode.h
+2
-4
src/inc/mpeer.h
src/inc/mpeer.h
+11
-4
src/inc/tbalance.h
src/inc/tbalance.h
+1
-0
src/inc/tcluster.h
src/inc/tcluster.h
+1
-0
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+13
-4
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+2
-2
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+2
-0
src/mnode/src/mgmtMain.c
src/mnode/src/mgmtMain.c
+5
-5
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+19
-39
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+76
-93
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+1
-20
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+4
-4
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+9
-7
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+2
-2
src/plugins/http/src/httpServer.c
src/plugins/http/src/httpServer.c
+8
-5
src/plugins/http/src/httpSystem.c
src/plugins/http/src/httpSystem.c
+1
-1
src/vnode/main/src/vnodeMain.c
src/vnode/main/src/vnodeMain.c
+1
-1
tests/script/tmp/mnodes.sim
tests/script/tmp/mnodes.sim
+7
-0
未找到文件。
src/dnode/CMakeLists.txt
浏览文件 @
0af376f7
...
...
@@ -31,6 +31,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES
(
taosd balance sync
)
ENDIF
()
IF
(
TD_MPEER
)
TARGET_LINK_LIBRARIES
(
taosd mpeer sync
)
ENDIF
()
SET
(
PREPARE_ENV_CMD
"prepare_env_cmd"
)
SET
(
PREPARE_ENV_TARGET
"prepare_env_target"
)
ADD_CUSTOM_COMMAND
(
OUTPUT
${
PREPARE_ENV_CMD
}
...
...
src/dnode/inc/dnodeMClient.h
浏览文件 @
0af376f7
...
...
@@ -25,6 +25,7 @@ void dnodeCleanupMClient();
void
dnodeSendMsgToMnode
(
SRpcMsg
*
rpcMsg
);
uint32_t
dnodeGetMnodeMasteIp
();
void
*
dnodeGetMpeerInfos
();
int32_t
dnodeGetDnodeId
();
#ifdef __cplusplus
}
...
...
src/dnode/inc/dnodeMgmt.h
浏览文件 @
0af376f7
...
...
@@ -23,7 +23,6 @@ extern "C" {
int32_t
dnodeInitMgmt
();
void
dnodeCleanupMgmt
();
void
dnodeMgmt
(
SRpcMsg
*
rpcMsg
);
void
dnodeUpdateDnodeId
(
int32_t
dnodeId
);
void
*
dnodeGetVnode
(
int32_t
vgId
);
int32_t
dnodeGetVnodeStatus
(
void
*
pVnode
);
...
...
src/dnode/src/dnodeMClient.c
浏览文件 @
0af376f7
...
...
@@ -21,30 +21,52 @@
#include "trpc.h"
#include "tutil.h"
#include "tsync.h"
#include "ttime.h"
#include "ttimer.h"
#include "dnode.h"
#include "dnodeMClient.h"
#include "dnodeModule.h"
#include "dnodeMgmt.h"
#include "vnode.h"
#include "mpeer.h"
#define MPEER_CONTENT_LEN 2000
static
bool
dnodeReadMnodeIpList
();
static
void
dnodeSaveMnodeIpList
();
static
void
dnodeReadDnodeInfo
();
static
void
dnodeUpdateDnodeInfo
(
int32_t
dnodeId
);
static
void
dnodeProcessRspFromMnode
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
);
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
);
static
void
(
*
tsDnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
*
tsDnodeMClientRpc
=
NULL
;
static
SRpcIpSet
tsMnodeIpList
=
{
0
};
static
SDMNodeInfos
tsMnodeInfos
=
{
0
};
static
void
*
tsDnodeTmr
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
uint32_t
tsRebootTime
;
static
int32_t
tsDnodeId
=
0
;
static
char
tsDnodeName
[
TSDB_NODE_NAME_LEN
];
int32_t
dnodeInitMClient
()
{
dnodeReadDnodeInfo
();
tsRebootTime
=
taosGetTimestampSec
();
tsDnodeTmr
=
taosTmrInit
(
100
,
200
,
60000
,
"DND-DM"
);
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"failed to init dnode timer"
);
return
-
1
;
}
if
(
!
dnodeReadMnodeIpList
())
{
memset
(
&
tsMnodeIpList
,
0
,
sizeof
(
SRpcIpSet
));
memset
(
&
tsMnodeInfos
,
0
,
sizeof
(
SDMNodeInfos
));
tsMnodeIpList
.
port
=
tsMnodeDnodePort
;
tsMnodeIpList
.
numOfIps
=
1
;
tsMnodeIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
if
(
tsSecondIp
[
0
]
)
{
if
(
strcmp
(
tsSecondIp
,
tsMasterIp
)
!=
0
)
{
tsMnodeIpList
.
numOfIps
=
2
;
tsMnodeIpList
.
ip
[
1
]
=
inet_addr
(
tsSecondIp
);
}
...
...
@@ -57,8 +79,6 @@ int32_t dnodeInitMClient() {
}
}
tsDnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_DM_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
...
...
@@ -79,11 +99,24 @@ int32_t dnodeInitMClient() {
return
-
1
;
}
tsDnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_DM_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
taosTmrReset
(
dnodeSendStatusMsg
,
500
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
dPrint
(
"mnode rpc client is opened"
);
return
0
;
}
void
dnodeCleanupMClient
()
{
if
(
tsStatusTimer
!=
NULL
)
{
taosTmrStopA
(
&
tsStatusTimer
);
tsStatusTimer
=
NULL
;
}
if
(
tsDnodeTmr
!=
NULL
)
{
taosTmrCleanUp
(
tsDnodeTmr
);
tsDnodeTmr
=
NULL
;
}
if
(
tsDnodeMClientRpc
)
{
rpcClose
(
tsDnodeMClientRpc
);
tsDnodeMClientRpc
=
NULL
;
...
...
@@ -104,6 +137,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"status rsp is received, error:%s"
,
tstrerror
(
pMsg
->
code
));
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
return
;
}
...
...
@@ -111,9 +145,19 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
SDMNodeInfos
*
mpeers
=
&
pStatusRsp
->
mpeers
;
if
(
mpeers
->
nodeNum
<=
0
)
{
dError
(
"status msg is invalid, num of ips is %d"
,
mpeers
->
nodeNum
);
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
return
;
}
SDnodeState
*
pState
=
&
pStatusRsp
->
dnodeState
;
pState
->
numOfVnodes
=
htonl
(
pState
->
numOfVnodes
);
pState
->
moduleStatus
=
htonl
(
pState
->
moduleStatus
);
pState
->
createdTime
=
htonl
(
pState
->
createdTime
);
pState
->
dnodeId
=
htonl
(
pState
->
dnodeId
);
dnodeProcessModuleStatus
(
pState
->
moduleStatus
);
dnodeUpdateDnodeInfo
(
pState
->
dnodeId
);
SRpcIpSet
mgmtIpSet
=
{
0
};
mgmtIpSet
.
inUse
=
mpeers
->
inUse
;
mgmtIpSet
.
numOfIps
=
mpeers
->
nodeNum
;
...
...
@@ -122,29 +166,25 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
mgmtIpSet
.
ip
[
i
]
=
htonl
(
mpeers
->
nodeInfos
[
i
].
nodeIp
);
}
if
(
memcmp
(
&
mgmtIpSet
,
&
tsMnodeIpList
,
sizeof
(
SRpcIpSet
))
!=
0
)
{
if
(
memcmp
(
&
mgmtIpSet
,
&
tsMnodeIpList
,
sizeof
(
SRpcIpSet
))
!=
0
||
tsMnodeInfos
.
nodeNum
==
0
)
{
memcpy
(
&
tsMnodeIpList
,
&
mgmtIpSet
,
sizeof
(
SRpcIpSet
));
memcpy
(
&
tsMnodeInfos
,
mpeers
,
sizeof
(
SDMNodeInfos
));
tsMnodeInfos
.
inUse
=
mpeers
->
inUse
;
tsMnodeInfos
.
nodeNum
=
mpeers
->
nodeNum
;
dPrint
(
"mnode ip list is changed, numOfIps:%d inUse:%d"
,
tsMnodeInfos
.
nodeNum
,
tsMnodeInfos
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
mpeers
->
nodeNum
;
i
++
)
{
tsMnodeInfos
.
nodeInfos
[
i
].
nodeId
=
htonl
(
mpeers
->
nodeInfos
[
i
].
nodeId
);
tsMnodeInfos
.
nodeInfos
[
i
].
nodeIp
=
htonl
(
mpeers
->
nodeInfos
[
i
].
nodeIp
);
tsMnodeInfos
.
nodeInfos
[
i
].
nodePort
=
htons
(
mpeers
->
nodeInfos
[
i
].
nodePort
);
strcpy
(
tsMnodeInfos
.
nodeInfos
[
i
].
nodeName
,
mpeers
->
nodeInfos
[
i
].
nodeName
);
dPrint
(
"mnode:%d, ip:%s:%u name:%s"
,
tsMnodeInfos
.
nodeInfos
[
i
].
nodeId
,
taosIpStr
(
tsMnodeInfos
.
nodeInfos
[
i
].
nodeI
d
),
tsMnodeInfos
.
nodeInfos
[
i
].
nodePort
,
taosIpStr
(
tsMnodeInfos
.
nodeInfos
[
i
].
nodeI
p
),
tsMnodeInfos
.
nodeInfos
[
i
].
nodePort
,
tsMnodeInfos
.
nodeInfos
[
i
].
nodeName
);
}
dnodeSaveMnodeIpList
();
mpeerUpdateSync
();
}
SDnodeState
*
pState
=
&
pStatusRsp
->
dnodeState
;
pState
->
numOfVnodes
=
htonl
(
pState
->
numOfVnodes
);
pState
->
moduleStatus
=
htonl
(
pState
->
moduleStatus
);
pState
->
createdTime
=
htonl
(
pState
->
createdTime
);
pState
->
dnodeId
=
htonl
(
pState
->
dnodeId
);
dnodeProcessModuleStatus
(
pState
->
moduleStatus
);
dnodeUpdateDnodeId
(
pState
->
dnodeId
);
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
}
void
dnodeSendMsgToMnode
(
SRpcMsg
*
rpcMsg
)
{
...
...
@@ -294,4 +334,93 @@ uint32_t dnodeGetMnodeMasteIp() {
void
*
dnodeGetMpeerInfos
()
{
return
&
tsMnodeInfos
;
}
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
)
{
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"dnode timer is already released"
);
return
;
}
if
(
tsStatusTimer
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
dError
(
"failed to start status timer"
);
return
;
}
int32_t
contLen
=
sizeof
(
SDMStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
SDMStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
if
(
pStatus
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
dError
(
"failed to malloc status message"
);
return
;
}
strcpy
(
pStatus
->
dnodeName
,
tsDnodeName
);
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
dnodeId
=
htonl
(
tsDnodeId
);
pStatus
->
privateIp
=
htonl
(
inet_addr
(
tsPrivateIp
));
pStatus
->
publicIp
=
htonl
(
inet_addr
(
tsPublicIp
));
pStatus
->
lastReboot
=
htonl
(
tsRebootTime
);
pStatus
->
numOfTotalVnodes
=
htons
((
uint16_t
)
tsNumOfTotalVnodes
);
pStatus
->
numOfCores
=
htons
((
uint16_t
)
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
(
uint8_t
)
tsAlternativeRole
;
vnodeBuildStatusMsg
(
pStatus
);
contLen
=
sizeof
(
SDMStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
SRpcMsg
rpcMsg
=
{
.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode
(
&
rpcMsg
);
}
static
void
dnodeReadDnodeInfo
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"r"
);
if
(
!
fp
)
return
;
char
option
[
32
]
=
{
0
};
int32_t
value
=
0
;
int32_t
num
=
0
;
num
=
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
;
if
(
strcmp
(
option
,
"dnodeId"
)
!=
0
)
return
;
tsDnodeId
=
value
;;
fclose
(
fp
);
dPrint
(
"read dnodeId:%d successed"
,
tsDnodeId
);
}
static
void
dnodeSaveDnodeInfo
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"w"
);
if
(
!
fp
)
return
;
fprintf
(
fp
,
"dnodeId %d
\n
"
,
tsDnodeId
);
fclose
(
fp
);
dPrint
(
"save dnodeId successed"
);
}
void
dnodeUpdateDnodeInfo
(
int32_t
dnodeId
)
{
if
(
tsDnodeId
==
0
)
{
dPrint
(
"dnodeId is set to %d"
,
dnodeId
);
tsDnodeId
=
dnodeId
;
dnodeSaveDnodeInfo
();
}
}
int32_t
dnodeGetDnodeId
()
{
return
tsDnodeId
;
}
\ No newline at end of file
src/dnode/src/dnodeMain.c
浏览文件 @
0af376f7
...
...
@@ -159,10 +159,10 @@ static int32_t dnodeInitSystem() {
dPrint
(
"starting to initialize TDengine ..."
);
if
(
dnodeInitStorage
()
!=
0
)
return
-
1
;
if
(
dnodeInitModules
()
!=
0
)
return
-
1
;
if
(
dnodeInitRead
()
!=
0
)
return
-
1
;
if
(
dnodeInitWrite
()
!=
0
)
return
-
1
;
if
(
dnodeInitMClient
()
!=
0
)
return
-
1
;
if
(
dnodeInitModules
()
!=
0
)
return
-
1
;
if
(
dnodeInitMnode
()
!=
0
)
return
-
1
;
if
(
dnodeInitMgmt
()
!=
0
)
return
-
1
;
if
(
dnodeInitShell
()
!=
0
)
return
-
1
;
...
...
@@ -177,7 +177,6 @@ static int32_t dnodeInitSystem() {
static
void
dnodeCleanUpSystem
()
{
if
(
dnodeGetRunStatus
()
!=
TSDB_DNODE_RUN_STATUS_STOPPED
)
{
tclearModuleStatus
(
TSDB_MOD_MGMT
);
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_STOPPED
);
dnodeCleanupShell
();
dnodeCleanupMnode
();
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
0af376f7
...
...
@@ -21,8 +21,6 @@
#include "tlog.h"
#include "trpc.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
#include "twal.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
...
...
@@ -38,52 +36,23 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static
int32_t
dnodeProcessAlterStreamMsg
(
SRpcMsg
*
pMsg
);
static
int32_t
dnodeProcessConfigDnodeMsg
(
SRpcMsg
*
pMsg
);
static
int32_t
(
*
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
pMsg
);
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
);
static
void
dnodeReadDnodeId
();
static
void
*
tsDnodeTmr
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
uint32_t
tsRebootTime
;
static
int32_t
tsDnodeId
=
0
;
static
char
tsDnodeName
[
TSDB_NODE_NAME_LEN
];
int32_t
dnodeInitMgmt
()
{
dnodeReadDnodeId
();
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeProcessCreateVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeProcessDropVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
dnodeProcessAlterVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeProcessAlterStreamMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeProcessConfigDnodeMsg
;
tsRebootTime
=
taosGetTimestampSec
();
tsDnodeTmr
=
taosTmrInit
(
100
,
200
,
60000
,
"DND-DM"
);
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"failed to init dnode timer"
);
return
-
1
;
}
int32_t
code
=
dnodeOpenVnodes
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
taosTmrReset
(
dnodeSendStatusMsg
,
500
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
return
TSDB_CODE_SUCCESS
;
}
void
dnodeCleanupMgmt
()
{
if
(
tsStatusTimer
!=
NULL
)
{
taosTmrStopA
(
&
tsStatusTimer
);
tsStatusTimer
=
NULL
;
}
if
(
tsDnodeTmr
!=
NULL
)
{
taosTmrCleanUp
(
tsDnodeTmr
);
tsDnodeTmr
=
NULL
;
}
dnodeCloseVnodes
();
}
...
...
@@ -213,89 +182,3 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg
*
pCfg
=
(
SMDCfgDnodeMsg
*
)
pMsg
->
pCont
;
return
tsCfgDynamicOptions
(
pCfg
->
config
);
}
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
)
{
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"dnode timer is already released"
);
return
;
}
if
(
tsStatusTimer
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
dError
(
"failed to start status timer"
);
return
;
}
int32_t
contLen
=
sizeof
(
SDMStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
SDMStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
if
(
pStatus
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
dError
(
"failed to malloc status message"
);
return
;
}
strcpy
(
pStatus
->
dnodeName
,
tsDnodeName
);
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
dnodeId
=
htonl
(
tsDnodeId
);
pStatus
->
privateIp
=
htonl
(
inet_addr
(
tsPrivateIp
));
pStatus
->
publicIp
=
htonl
(
inet_addr
(
tsPublicIp
));
pStatus
->
lastReboot
=
htonl
(
tsRebootTime
);
pStatus
->
numOfTotalVnodes
=
htons
((
uint16_t
)
tsNumOfTotalVnodes
);
pStatus
->
numOfCores
=
htons
((
uint16_t
)
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
(
uint8_t
)
tsAlternativeRole
;
vnodeBuildStatusMsg
(
pStatus
);
contLen
=
sizeof
(
SDMStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
SRpcMsg
rpcMsg
=
{
.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode
(
&
rpcMsg
);
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
}
static
void
dnodeReadDnodeId
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"r"
);
if
(
!
fp
)
return
;
char
option
[
32
]
=
{
0
};
int32_t
value
=
0
;
int32_t
num
=
0
;
num
=
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
;
if
(
strcmp
(
option
,
"dnodeId"
)
!=
0
)
return
;
tsDnodeId
=
value
;;
fclose
(
fp
);
dPrint
(
"read dnodeId:%d successed"
,
tsDnodeId
);
}
static
void
dnodeSaveDnodeId
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"w"
);
if
(
!
fp
)
return
;
fprintf
(
fp
,
"dnodeId %d
\n
"
,
tsDnodeId
);
fclose
(
fp
);
dPrint
(
"save dnodeId successed"
);
}
void
dnodeUpdateDnodeId
(
int32_t
dnodeId
)
{
if
(
tsDnodeId
==
0
)
{
dPrint
(
"dnodeId is set to %d"
,
dnodeId
);
tsDnodeId
=
dnodeId
;
dnodeSaveDnodeId
();
}
}
src/inc/mnode.h
浏览文件 @
0af376f7
...
...
@@ -45,16 +45,14 @@ struct _mnode_obj;
typedef
struct
_mnode_obj
{
int32_t
mnodeId
;
int32_t
dnodeId
;
int64_t
createdTime
;
int8_t
reserved
[
14
];
int8_t
updateEnd
[
1
];
int32_t
refCount
;
int8_t
role
;
int8_t
status
;
uint16_t
port
;
uint32_t
privateIp
;
uint32_t
publicIp
;
uint16_t
port
;
int8_t
role
;
char
mnodeName
[
TSDB_NODE_NAME_LEN
+
1
];
}
SMnodeObj
;
...
...
src/inc/mpeer.h
浏览文件 @
0af376f7
...
...
@@ -28,22 +28,29 @@ enum _TAOS_MN_STATUS {
TAOS_MN_STATUS_READY
};
// general implementation
int32_t
mpeerInit
();
void
mpeerCleanup
();
// special implementation
int32_t
mpeerInitMnodes
();
void
mpeerCleanupMnodes
();
int32_t
mpeerAddMnode
(
int32_t
dnodeId
);
int32_t
mpeerRemoveMnode
(
int32_t
dnodeId
);
void
*
mpeerGetMnode
(
int32_t
mnodeId
);
int32_t
mpeerGetMnodesNum
();
void
*
mpeerGetNextMnode
(
void
*
pNode
,
struct
_mnode_obj
**
pMnode
);
void
mpeerReleaseMnode
(
struct
_mnode_obj
*
pMnode
);
bool
mpeerInServerStatus
();
bool
mpeerIsMaster
();
bool
mpeerCheckRedirect
();
void
mpeerGetPrivateIpList
(
SRpcIpSet
*
ipSet
);
void
mpeerGetPublicIpList
(
SRpcIpSet
*
ipSet
);
void
mpeerGetMpeerInfos
(
void
*
mpeers
);
char
*
mpeerGetMnodeStatusStr
(
int32_t
status
);
char
*
mpeerGetMnodeRoleStr
(
int32_t
role
);
int32_t
mpeerForwardReqToPeer
(
void
*
pHead
);
void
mpeerUpdateSync
(
);
#ifdef __cplusplus
}
...
...
src/inc/tbalance.h
浏览文件 @
0af376f7
...
...
@@ -31,6 +31,7 @@ struct _dnode_obj;
int32_t
balanceInit
();
void
balanceCleanUp
();
void
balanceNotify
();
void
balanceReset
();
int32_t
balanceAllocVnodes
(
struct
_vg_obj
*
pVgroup
);
int32_t
balanceDropDnode
(
struct
_dnode_obj
*
pDnode
);
...
...
src/inc/tcluster.h
浏览文件 @
0af376f7
...
...
@@ -37,6 +37,7 @@ int32_t clusterInit();
void
clusterCleanUp
();
char
*
clusterGetDnodeStatusStr
(
int32_t
dnodeStatus
);
bool
clusterCheckModuleInDnode
(
struct
_dnode_obj
*
pDnode
,
int
moduleType
);
void
clusterMonitorDnodeModule
();
int32_t
clusterInitDnodes
();
void
clusterCleanupDnodes
();
...
...
src/mnode/inc/mgmtSdb.h
浏览文件 @
0af376f7
...
...
@@ -21,8 +21,8 @@ extern "C" {
#endif
typedef
enum
{
SDB_TABLE_
M
NODE
=
0
,
SDB_TABLE_
D
NODE
=
1
,
SDB_TABLE_
D
NODE
=
0
,
SDB_TABLE_
M
NODE
=
1
,
SDB_TABLE_ACCOUNT
=
2
,
SDB_TABLE_USER
=
3
,
SDB_TABLE_DB
=
4
,
...
...
@@ -34,6 +34,7 @@ typedef enum {
typedef
enum
{
SDB_KEY_STRING
,
SDB_KEY_INT
,
SDB_KEY_AUTO
}
ESdbKeyType
;
...
...
@@ -63,14 +64,22 @@ typedef struct {
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
updateAll
Fp
)();
int32_t
(
*
restored
Fp
)();
}
SSdbTableDesc
;
typedef
struct
{
int64_t
version
;
void
*
wal
;
pthread_mutex_t
mutex
;
}
SSdbObject
;
int32_t
sdbInit
();
void
sdbCleanUp
();
SSdbObject
*
sdbGetObj
();
void
*
sdbOpenTable
(
SSdbTableDesc
*
desc
);
void
sdbCloseTable
(
void
*
handle
);
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
);
int32_t
sdbInsertRow
(
SSdbOperDesc
*
pOper
);
int32_t
sdbDeleteRow
(
SSdbOperDesc
*
pOper
);
...
...
@@ -81,7 +90,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
void
sdbIncRef
(
void
*
thandle
,
void
*
pRow
);
void
sdbDecRef
(
void
*
thandle
,
void
*
pRow
);
int64_t
sdbGetNumOfRows
(
void
*
handle
);
int
64
_t
sdbGetId
(
void
*
handle
);
int
32
_t
sdbGetId
(
void
*
handle
);
uint64_t
sdbGetVersion
();
#ifdef __cplusplus
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
0af376f7
...
...
@@ -102,7 +102,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbAction
UpdateAll
()
{
static
int32_t
mgmtDbAction
Restored
()
{
return
0
;
}
...
...
@@ -123,7 +123,7 @@ int32_t mgmtInitDbs() {
.
encodeFp
=
mgmtDbActionEncode
,
.
decodeFp
=
mgmtDbActionDecode
,
.
destroyFp
=
mgmtDbActionDestroy
,
.
updateAllFp
=
mgmtDbActionUpdateAll
.
restoredFp
=
mgmtDbActionRestored
};
tsDbSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
0af376f7
...
...
@@ -77,6 +77,7 @@ void * clusterGetDnode(int32_t dnodeId) { return dnodeId == 1 ? &tsDnodeObj : N
void
*
clusterGetDnodeByIp
(
uint32_t
ip
)
{
return
&
tsDnodeObj
;
}
void
clusterReleaseDnode
(
struct
_dnode_obj
*
pDnode
)
{}
void
clusterUpdateDnode
(
struct
_dnode_obj
*
pDnode
)
{}
void
clusterMonitorDnodeModule
()
{}
#endif
...
...
@@ -208,6 +209,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TAOS_DN_STATUS_READY
;
balanceNotify
();
clusterMonitorDnodeModule
();
}
clusterReleaseDnode
(
pDnode
);
...
...
src/mnode/src/mgmtMain.c
浏览文件 @
0af376f7
...
...
@@ -109,6 +109,11 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
mpeerInit
()
<
0
)
{
mError
(
"failed to init mpeers"
);
return
-
1
;
}
if
(
sdbInit
()
<
0
)
{
mError
(
"failed to init sdb"
);
return
-
1
;
...
...
@@ -122,11 +127,6 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
mpeerInit
()
<
0
)
{
mError
(
"failed to init mpeers"
);
return
-
1
;
}
if
(
balanceInit
()
<
0
)
{
mError
(
"failed to init dnode balance"
)
}
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
0af376f7
...
...
@@ -19,12 +19,9 @@
#include "trpc.h"
#include "tsync.h"
#include "mpeer.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
extern
int32_t
mpeerInitMnodes
();
extern
void
mpeerCleanupMnodes
();
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
...
...
@@ -34,18 +31,25 @@ static SMnodeObj tsMnodeObj = {0};
int32_t
mpeerInitMnodes
()
{
tsMnodeObj
.
mnodeId
=
1
;
tsMnodeObj
.
dnodeId
=
1
;
tsMnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);
tsMnodeObj
.
publicIp
=
inet_addr
(
tsPublicIp
);
tsMnodeObj
.
createdTime
=
taosGetTimestampMs
();
tsMnodeObj
.
role
=
TAOS_SYNC_ROLE_MASTER
;
tsMnodeObj
.
status
=
TAOS_MN_STATUS_READY
;
tsMnodeObj
.
port
=
tsMnodeDnodePort
;
sprintf
(
tsMnodeObj
.
mnodeName
,
"m%d"
,
tsMnodeObj
.
mnodeId
);
return
TSDB_CODE_SUCCESS
;
}
void
mpeerCleanupMnodes
()
{}
int32_t
mpeerAddMnode
(
int32_t
dnodeId
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
mpeerRemoveMnode
(
int32_t
dnodeId
)
{
return
TSDB_CODE_SUCCESS
;
}
void
*
mpeerGetMnode
(
int32_t
mnodeId
)
{
return
&
tsMnodeObj
;
}
int32_t
mpeerGetMnodesNum
()
{
return
1
;
}
void
mpeerReleaseMnode
(
struct
_mnode_obj
*
pMnode
)
{}
bool
mpeerIsMaster
()
{
return
tsMnodeObj
.
role
==
TAOS_SYNC_ROLE_MASTER
;
}
void
mpeerUpdateSync
()
{}
void
*
mpeerGetNextMnode
(
void
*
pNode
,
SMnodeObj
**
pMnode
)
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
&
tsMnodeObj
;
...
...
@@ -58,20 +62,21 @@ void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) {
void
mpeerGetPrivateIpList
(
SRpcIpSet
*
ipSet
)
{
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
port
=
htons
(
tsMnodeObj
.
port
);
ipSet
->
ip
[
0
]
=
htonl
(
tsMnodeObj
.
privateIp
);
}
void
mpeerGetPublicIpList
(
SRpcIpSet
*
ipSet
)
{
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
port
=
htons
(
tsMnodeObj
.
port
);
ipSet
->
ip
[
0
]
=
htonl
(
tsMnodeObj
.
publicIp
);
}
void
mpeerGetMpeerInfos
(
void
*
param
)
{
SDMNodeInfos
*
mpeers
=
param
;
mpeers
->
inUse
=
0
;
mpeers
->
nodeNum
=
1
;
mpeers
->
nodeInfos
[
0
].
nodeId
=
htonl
(
tsMnodeObj
.
mnodeId
);
mpeers
->
nodeInfos
[
0
].
nodeIp
=
htonl
(
tsMnodeObj
.
privateIp
);
...
...
@@ -79,40 +84,23 @@ void mpeerGetMpeerInfos(void *param) {
strcpy
(
mpeers
->
nodeInfos
[
0
].
nodeName
,
tsMnodeObj
.
mnodeName
);
}
void
mpeerCleanupDnodes
()
{}
int32_t
mpeerGetMnodesNum
()
{
return
1
;
}
void
mpeerReleaseMnode
(
struct
_mnode_obj
*
pMnode
)
{}
bool
mpeerInServerStatus
()
{
return
tsMnodeObj
.
status
==
TAOS_MN_STATUS_READY
;
}
bool
mpeerIsMaster
()
{
return
tsMnodeObj
.
role
==
TAOS_SYNC_ROLE_MASTER
;
}
bool
mpeerCheckRedirect
()
{
return
false
;
}
int32_t
mpeerForwardReqToPeer
(
void
*
pHead
)
{
return
TSDB_CODE_SUCCESS
;
}
#endif
int32_t
mpeerInit
()
{
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtGetMnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtRetrieveMnodes
);
return
mpeerInitMnodes
();
}
void
mpeerCleanup
()
{
mpeerCleanupDnodes
();
}
char
*
mpeerGetMnodeStatusStr
(
int32_t
status
)
{
switch
(
status
)
{
case
TAOS_MN_STATUS_OFFLINE
:
return
"offline"
;
case
TAOS_MN_STATUS_DROPPING
:
return
"dropping"
;
case
TAOS_MN_STATUS_READY
:
return
"ready"
;
default:
return
"undefined"
;
}
mpeerCleanupMnodes
();
}
char
*
mpeerGetMnodeRoleStr
(
int32_t
role
)
{
static
char
*
mpeerGetMnodeRoleStr
(
int32_t
role
)
{
switch
(
role
)
{
case
TAOS_SYNC_ROLE_OFFLINE
:
return
"offline"
;
...
...
@@ -160,12 +148,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
10
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
10
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"role"
);
...
...
@@ -220,14 +202,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mpeerGetMnodeStatusStr
(
pMnode
->
status
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mpeerGetMnodeRoleStr
(
pMnode
->
role
));
cols
++
;
numOfRows
++
;
mpeerReleaseMnode
(
pMnode
);
}
pShow
->
numOfReads
+=
numOfRows
;
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
0af376f7
...
...
@@ -15,28 +15,16 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "trpc.h"
#include "t
util
.h"
#include "t
queue
.h"
#include "twal.h"
#include "tsync.h"
#include "hashint.h"
#include "hashstr.h"
#include "mpeer.h"
#include "mgmtSdb.h"
typedef
struct
{
int32_t
code
;
int64_t
version
;
void
*
sync
;
void
*
wal
;
sem_t
sem
;
pthread_mutex_t
mutex
;
}
SSdbSync
;
typedef
struct
_SSdbTable
{
char
tableName
[
TSDB_DB_NAME_LEN
+
1
];
ESdbTable
tableId
;
...
...
@@ -47,13 +35,13 @@ typedef struct _SSdbTable {
int32_t
autoIndex
;
int64_t
numOfRows
;
void
*
iHandle
;
int32_t
(
*
insertFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
deleteFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateAll
Fp
)();
int32_t
(
*
insertFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
deleteFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
restored
Fp
)();
pthread_mutex_t
mutex
;
}
SSdbTable
;
...
...
@@ -70,18 +58,17 @@ typedef enum {
static
SSdbTable
*
tsSdbTableList
[
SDB_TABLE_MAX
]
=
{
0
};
static
int32_t
tsSdbNumOfTables
=
0
;
static
SSdbSync
*
tsSdbSync
;
static
void
*
(
*
sdbInitIndexFp
[])(
int32_t
maxRows
,
int32_t
dataSize
)
=
{
sdbOpenStrHash
,
sdbOpenIntHash
};
static
void
*
(
*
sdbAddIndexFp
[])(
void
*
handle
,
void
*
key
,
void
*
data
)
=
{
sdbAddStrHash
,
sdbAddIntHash
};
static
void
(
*
sdbDeleteIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbDeleteStrHash
,
sdbDeleteIntHash
};
static
void
*
(
*
sdbGetIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbGetStrHashData
,
sdbGetIntHashData
};
static
void
(
*
sdbCleanUpIndexFp
[])(
void
*
handle
)
=
{
sdbCloseStrHash
,
sdbCloseIntHash
};
static
void
*
(
*
sdbFetchRowFp
[])(
void
*
handle
,
void
*
ptr
,
void
**
ppRow
)
=
{
sdbFetchStrHashData
,
sdbFetchIntHashData
};
static
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
);
uint64_t
sdbGetVersion
()
{
return
tsSdbSync
->
version
;
}
int64_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
autoIndex
;
}
static
SSdbObject
*
tsSdbObj
;
static
void
*
(
*
sdbInitIndexFp
[])(
int32_t
maxRows
,
int32_t
dataSize
)
=
{
sdbOpenStrHash
,
sdbOpenIntHash
,
sdbOpenIntHash
};
static
void
*
(
*
sdbAddIndexFp
[])(
void
*
handle
,
void
*
key
,
void
*
data
)
=
{
sdbAddStrHash
,
sdbAddIntHash
,
sdbAddIntHash
};
static
void
(
*
sdbDeleteIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbDeleteStrHash
,
sdbDeleteIntHash
,
sdbDeleteIntHash
};
static
void
*
(
*
sdbGetIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbGetStrHashData
,
sdbGetIntHashData
,
sdbGetIntHashData
};
static
void
(
*
sdbCleanUpIndexFp
[])(
void
*
handle
)
=
{
sdbCloseStrHash
,
sdbCloseIntHash
,
sdbCloseIntHash
};
static
void
*
(
*
sdbFetchRowFp
[])(
void
*
handle
,
void
*
ptr
,
void
**
ppRow
)
=
{
sdbFetchStrHashData
,
sdbFetchIntHashData
,
sdbFetchIntHashData
};
uint64_t
sdbGetVersion
()
{
return
tsSdbObj
->
version
;
}
int32_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
autoIndex
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
static
char
*
sdbGetActionStr
(
int32_t
action
)
{
...
...
@@ -101,6 +88,7 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) {
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_STRING
:
return
(
char
*
)
row
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
row
);
return
str
;
...
...
@@ -113,44 +101,27 @@ static void *sdbGetTableFromId(int32_t tableId) {
return
tsSdbTableList
[
tableId
];
}
// static void mpeerConfirmForward(void *ahandle, void *param, int32_t code) {
// sem_post(&tsSdbSync->sem);
// mPrint("mpeerConfirmForward");
// }
static
int32_t
sdbForwardDbReqToPeer
(
SWalHead
*
pHead
)
{
// int32_t code = syncForwardToPeer(NULL, pHead, NULL);
// if (code < 0) {
// return code;
// }
// sem_wait(&tsSdbSync->sem);
// return tsSdbSync->code;
return
TSDB_CODE_SUCCESS
;
}
int32_t
sdbInit
()
{
tsSdbSync
=
calloc
(
1
,
sizeof
(
SSdbSync
));
sem_init
(
&
tsSdbSync
->
sem
,
0
,
0
);
pthread_mutex_init
(
&
tsSdbSync
->
mutex
,
NULL
);
tsSdbObj
=
calloc
(
1
,
sizeof
(
SSdbObject
));
pthread_mutex_init
(
&
tsSdbObj
->
mutex
,
NULL
);
SWalCfg
walCfg
=
{.
commitLog
=
2
,
.
wals
=
2
,
.
keep
=
1
};
tsSdb
Sync
->
wal
=
walOpen
(
tsMnodeDir
,
&
walCfg
);
if
(
tsSdb
Sync
->
wal
==
NULL
)
{
tsSdb
Obj
->
wal
=
walOpen
(
tsMnodeDir
,
&
walCfg
);
if
(
tsSdb
Obj
->
wal
==
NULL
)
{
sdbError
(
"failed to open sdb in %s"
,
tsMnodeDir
);
return
-
1
;
}
sdbTrace
(
"open sdb file for read"
);
walRestore
(
tsSdb
Sync
->
wal
,
tsSdbSync
,
sdbProcessWrite
);
walRestore
(
tsSdb
Obj
->
wal
,
tsSdbObj
,
sdbProcessWrite
);
int32_t
totalRows
=
0
;
int32_t
numOfTables
=
0
;
for
(
int32_t
tableId
=
SDB_TABLE_DNODE
;
tableId
<
SDB_TABLE_MAX
;
++
tableId
)
{
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
if
(
pTable
==
NULL
)
continue
;
if
(
pTable
->
updateAll
Fp
)
{
(
*
pTable
->
updateAll
Fp
)();
if
(
pTable
->
restored
Fp
)
{
(
*
pTable
->
restored
Fp
)();
}
totalRows
+=
pTable
->
numOfRows
;
...
...
@@ -158,20 +129,26 @@ int32_t sdbInit() {
sdbTrace
(
"table:%s, is initialized, numOfRows:%d"
,
pTable
->
tableName
,
pTable
->
numOfRows
);
}
sdbTrace
(
"sdb is initialized, version:%d totalRows:%d numOfTables:%d"
,
tsSdbSync
->
version
,
totalRows
,
numOfTables
);
sdbTrace
(
"sdb is initialized, version:%d totalRows:%d numOfTables:%d"
,
tsSdbObj
->
version
,
totalRows
,
numOfTables
);
mpeerUpdateSync
();
return
TSDB_CODE_SUCCESS
;
}
void
sdbCleanUp
()
{
if
(
tsSdbSync
)
{
sem_destroy
(
&
tsSdbSync
->
sem
);
pthread_mutex_destroy
(
&
tsSdbSync
->
mutex
);
walClose
(
tsSdbSync
->
wal
);
free
(
tsSdbSync
);
tsSdbSync
=
NULL
;
if
(
tsSdbObj
)
{
pthread_mutex_destroy
(
&
tsSdbObj
->
mutex
);
walClose
(
tsSdbObj
->
wal
);
free
(
tsSdbObj
);
tsSdbObj
=
NULL
;
}
}
SSdbObject
*
sdbGetObj
()
{
return
tsSdbObj
;
}
void
sdbIncRef
(
void
*
handle
,
void
*
pRow
)
{
if
(
pRow
)
{
SSdbTable
*
pTable
=
handle
;
...
...
@@ -241,6 +218,11 @@ static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pOper
->
pObj
,
&
rowMeta
);
sdbIncRef
(
pTable
,
pOper
->
pObj
);
pTable
->
numOfRows
++
;
if
(
pTable
->
keyType
==
SDB_KEY_AUTO
)
{
pTable
->
autoIndex
=
MAX
(
pTable
->
autoIndex
,
*
((
uint32_t
*
)
pOper
->
pObj
));
}
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, insert record:%s, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
...
...
@@ -278,20 +260,20 @@ static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
static
int32_t
sdbProcessWriteFromApp
(
SSdbTable
*
pTable
,
SWalHead
*
pHead
,
int32_t
action
)
{
int32_t
code
=
0
;
pthread_mutex_lock
(
&
tsSdb
Sync
->
mutex
);
tsSdb
Sync
->
version
++
;
pHead
->
version
=
tsSdb
Sync
->
version
;
pthread_mutex_lock
(
&
tsSdb
Obj
->
mutex
);
tsSdb
Obj
->
version
++
;
pHead
->
version
=
tsSdb
Obj
->
version
;
code
=
sdbForwardDb
ReqToPeer
(
pHead
);
code
=
mpeerForward
ReqToPeer
(
pHead
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
sdbError
(
"table:%s, failed to forward %s record:%s from file, version:%"
PRId64
", reason:%s"
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tstrerror
(
code
));
return
code
;
}
code
=
walWrite
(
tsSdb
Sync
->
wal
,
pHead
);
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
code
=
walWrite
(
tsSdb
Obj
->
wal
,
pHead
);
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
if
(
code
<
0
)
{
sdbError
(
"table:%s, failed to %s record:%s to file, version:%"
PRId64
", reason:%s"
,
pTable
->
tableName
,
...
...
@@ -301,26 +283,25 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
}
walFsync
(
tsSdbSync
->
wal
);
free
(
pHead
);
walFsync
(
tsSdbObj
->
wal
);
taosFreeQitem
(
pHead
);
return
code
;
}
static
int32_t
sdbProcessWriteFromWal
(
SSdbTable
*
pTable
,
SWalHead
*
pHead
,
int32_t
action
)
{
pthread_mutex_lock
(
&
tsSdb
Sync
->
mutex
);
if
(
pHead
->
version
<=
tsSdb
Sync
->
version
)
{
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
pthread_mutex_lock
(
&
tsSdb
Obj
->
mutex
);
if
(
pHead
->
version
<=
tsSdb
Obj
->
version
)
{
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdb
Sync
->
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
}
else
if
(
pHead
->
version
!=
tsSdb
Obj
->
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
sdbError
(
"table:%s, failed to restore %s record:%s from file, version:%"
PRId64
" too large, sdb version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tsSdb
Sync
->
version
);
tsSdb
Obj
->
version
);
return
TSDB_CODE_OTHERS
;
}
tsSdb
Sync
->
version
=
pHead
->
version
;
tsSdb
Obj
->
version
=
pHead
->
version
;
sdbTrace
(
"table:%s, success to restore %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
...
...
@@ -335,7 +316,7 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_
if
(
code
<
0
)
{
sdbTrace
(
"table:%s, failed to decode %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
return
code
;
}
...
...
@@ -369,17 +350,17 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_
if
(
code
<
0
)
{
sdbTrace
(
"table:%s, failed to decode %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
return
code
;
}
code
=
sdbInsertLocal
(
pTable
,
&
oper2
);
}
pthread_mutex_unlock
(
&
tsSdb
Sync
->
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Obj
->
mutex
);
return
code
;
}
static
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
)
{
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
)
{
SWalHead
*
pHead
=
data
;
int32_t
tableId
=
pHead
->
msgType
/
10
;
int32_t
action
=
pHead
->
msgType
%
10
;
...
...
@@ -417,7 +398,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
if
(
pOper
->
type
==
SDB_OPER_GLOBAL
)
{
int32_t
size
=
sizeof
(
SWalHead
)
+
pTable
->
maxRowSize
;
SWalHead
*
pHead
=
calloc
(
1
,
size
);
SWalHead
*
pHead
=
taosAllocateQitem
(
size
);
pHead
->
version
=
0
;
pHead
->
len
=
pOper
->
rowSize
;
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_INSERT
;
...
...
@@ -426,7 +407,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
(
*
pTable
->
encodeFp
)(
pOper
);
pHead
->
len
=
pOper
->
rowSize
;
int32_t
code
=
sdbProcessWrite
(
tsSdb
Sync
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbProcessWrite
(
tsSdb
Obj
,
pHead
,
pHead
->
msgType
);
if
(
code
<
0
)
return
code
;
}
...
...
@@ -453,6 +434,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
case
SDB_KEY_STRING
:
rowSize
=
strlen
((
char
*
)
pOper
->
pObj
)
+
1
;
break
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
rowSize
=
sizeof
(
uint64_t
);
break
;
...
...
@@ -461,13 +443,13 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
}
int32_t
size
=
sizeof
(
SWalHead
)
+
rowSize
;
SWalHead
*
pHead
=
calloc
(
1
,
size
);
SWalHead
*
pHead
=
taosAllocateQitem
(
size
);
pHead
->
version
=
0
;
pHead
->
len
=
rowSize
;
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_DELETE
;
memcpy
(
pHead
->
cont
,
pOper
->
pObj
,
rowSize
);
int32_t
code
=
sdbProcessWrite
(
tsSdb
Sync
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbProcessWrite
(
tsSdb
Obj
,
pHead
,
pHead
->
msgType
);
if
(
code
<
0
)
return
code
;
}
...
...
@@ -489,7 +471,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
if
(
pOper
->
type
==
SDB_OPER_GLOBAL
)
{
int32_t
size
=
sizeof
(
SWalHead
)
+
pTable
->
maxRowSize
;
SWalHead
*
pHead
=
calloc
(
1
,
size
);
SWalHead
*
pHead
=
taosAllocateQitem
(
size
);
pHead
->
version
=
0
;
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_UPDATE
;
...
...
@@ -497,7 +479,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
(
*
pTable
->
encodeFp
)(
pOper
);
pHead
->
len
=
pOper
->
rowSize
;
int32_t
code
=
sdbProcessWrite
(
tsSdb
Sync
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbProcessWrite
(
tsSdb
Obj
,
pHead
,
pHead
->
msgType
);
if
(
code
<
0
)
return
code
;
}
...
...
@@ -522,6 +504,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
void
*
sdbOpenTable
(
SSdbTableDesc
*
pDesc
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
calloc
(
1
,
sizeof
(
SSdbTable
));
if
(
pTable
==
NULL
)
return
NULL
;
strcpy
(
pTable
->
tableName
,
pDesc
->
tableName
);
...
...
@@ -536,7 +519,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable
->
encodeFp
=
pDesc
->
encodeFp
;
pTable
->
decodeFp
=
pDesc
->
decodeFp
;
pTable
->
destroyFp
=
pDesc
->
destroyFp
;
pTable
->
updateAllFp
=
pDesc
->
updateAll
Fp
;
pTable
->
restoredFp
=
pDesc
->
restored
Fp
;
if
(
sdbInitIndexFp
[
pTable
->
keyType
]
!=
NULL
)
{
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
SRowMeta
));
...
...
@@ -575,7 +558,7 @@ void sdbCloseTable(void *handle) {
}
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdbNumOfTables
);
free
(
pTable
);
}
src/mnode/src/mgmtShell.c
浏览文件 @
0af376f7
...
...
@@ -42,7 +42,6 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *sec
static
bool
mgmtCheckMsgReadOnly
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
void
mgmtProcessUnSupportMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessMsgWhileNotReady
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessShowMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessRetrieveMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessHeartBeatMsg
(
SQueuedMsg
*
queuedMsg
);
...
...
@@ -142,19 +141,13 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return
;
}
if
(
mpeerCheckRedirect
())
{
if
(
!
mpeerIsMaster
())
{
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NO_MASTER
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
if
(
!
mpeerInServerStatus
())
{
mgmtProcessMsgWhileNotReady
(
rpcMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
if
(
grantCheck
(
TSDB_GRANT_TIME
)
!=
TSDB_CODE_SUCCESS
)
{
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_GRANT_EXPIRED
);
rpcFreeCont
(
rpcMsg
->
pCont
);
...
...
@@ -501,18 +494,6 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessMsgWhileNotReady
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"%s is ignored since SDB is not ready"
,
taosMsg
[
rpcMsg
->
msgType
]);
SRpcMsg
rpcRsp
=
{
.
msgType
=
0
,
.
pCont
=
0
,
.
contLen
=
0
,
.
code
=
TSDB_CODE_NOT_READY
,
.
handle
=
rpcMsg
->
handle
};
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtSendSimpleResp
(
void
*
thandle
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
0
,
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
0af376f7
...
...
@@ -220,7 +220,7 @@ static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableAction
UpdateAll
()
{
static
int32_t
mgmtChildTableAction
Restored
()
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SChildTableObj
*
pTable
=
NULL
;
...
...
@@ -320,7 +320,7 @@ static int32_t mgmtInitChildTables() {
.
encodeFp
=
mgmtChildTableActionEncode
,
.
decodeFp
=
mgmtChildTableActionDecode
,
.
destroyFp
=
mgmtChildTableActionDestroy
,
.
updateAllFp
=
mgmtChildTableActionUpdateAll
.
restoredFp
=
mgmtChildTableActionRestored
};
tsChildTableSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -414,7 +414,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableAction
UpdateAll
()
{
static
int32_t
mgmtSuperTableAction
Restored
()
{
return
0
;
}
...
...
@@ -435,7 +435,7 @@ static int32_t mgmtInitSuperTables() {
.
encodeFp
=
mgmtSuperTableActionEncode
,
.
decodeFp
=
mgmtSuperTableActionDecode
,
.
destroyFp
=
mgmtSuperTableActionDestroy
,
.
updateAllFp
=
mgmtSuperTableActionUpdateAll
.
restoredFp
=
mgmtSuperTableActionRestored
};
tsSuperTableSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
0af376f7
...
...
@@ -84,12 +84,14 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionUpdateAll
()
{
SAcctObj
*
pAcct
=
acctGetAcct
(
"root"
);
mgmtCreateUser
(
pAcct
,
"root"
,
"taosdata"
);
mgmtCreateUser
(
pAcct
,
"monitor"
,
tsInternalPass
);
mgmtCreateUser
(
pAcct
,
"_root"
,
tsInternalPass
);
acctReleaseAcct
(
pAcct
);
static
int32_t
mgmtUserActionRestored
()
{
if
(
strcmp
(
tsMasterIp
,
tsPrivateIp
)
==
0
)
{
SAcctObj
*
pAcct
=
acctGetAcct
(
"root"
);
mgmtCreateUser
(
pAcct
,
"root"
,
"taosdata"
);
mgmtCreateUser
(
pAcct
,
"monitor"
,
tsInternalPass
);
mgmtCreateUser
(
pAcct
,
"_root"
,
tsInternalPass
);
acctReleaseAcct
(
pAcct
);
}
return
0
;
}
...
...
@@ -111,7 +113,7 @@ int32_t mgmtInitUsers() {
.
encodeFp
=
mgmtUserActionEncode
,
.
decodeFp
=
mgmtUserActionDecode
,
.
destroyFp
=
mgmtUserActionDestroy
,
.
updateAllFp
=
mgmtUserActionUpdateAll
.
restoredFp
=
mgmtUserActionRestored
};
tsUserSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
0af376f7
...
...
@@ -152,7 +152,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupAction
UpdateAll
()
{
static
int32_t
mgmtVgroupAction
Restored
()
{
return
0
;
}
...
...
@@ -173,7 +173,7 @@ int32_t mgmtInitVgroups() {
.
encodeFp
=
mgmtVgroupActionEncode
,
.
decodeFp
=
mgmtVgroupActionDecode
,
.
destroyFp
=
mgmtVgroupActionDestroy
,
.
updateAllFp
=
mgmtVgroupActionUpdateAll
,
.
restoredFp
=
mgmtVgroupActionRestored
,
};
tsVgroupSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
src/plugins/http/src/httpServer.c
浏览文件 @
0af376f7
...
...
@@ -270,7 +270,7 @@ void httpCleanUpConnect(HttpServer *pServer) {
for
(
i
=
0
;
i
<
pServer
->
numOfThreads
;
++
i
)
{
pThread
=
pServer
->
pThreads
+
i
;
taosCloseSocket
(
pThread
->
pollFd
);
//
taosCloseSocket(pThread->pollFd);
while
(
pThread
->
pHead
)
{
httpCleanUpContext
(
pThread
->
pHead
,
0
);
...
...
@@ -591,7 +591,6 @@ void httpAcceptHttpConnection(void *arg) {
bool
httpInitConnect
(
HttpServer
*
pServer
)
{
int
i
;
pthread_attr_t
thattr
;
HttpThread
*
pThread
;
pServer
->
pThreads
=
(
HttpThread
*
)
malloc
(
sizeof
(
HttpThread
)
*
(
size_t
)
pServer
->
numOfThreads
);
...
...
@@ -601,8 +600,6 @@ bool httpInitConnect(HttpServer *pServer) {
}
memset
(
pServer
->
pThreads
,
0
,
sizeof
(
HttpThread
)
*
(
size_t
)
pServer
->
numOfThreads
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pThread
=
pServer
->
pThreads
;
for
(
i
=
0
;
i
<
pServer
->
numOfThreads
;
++
i
)
{
sprintf
(
pThread
->
label
,
"%s%d"
,
pServer
->
label
,
i
);
...
...
@@ -626,21 +623,27 @@ bool httpInitConnect(HttpServer *pServer) {
return
false
;
}
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThread
->
thread
),
&
thattr
,
(
void
*
)
httpProcessHttpData
,
(
void
*
)(
pThread
))
!=
0
)
{
httpError
(
"http thread:%s, failed to create HTTP process data thread, reason:%s"
,
pThread
->
label
,
strerror
(
errno
));
return
false
;
}
pthread_attr_destroy
(
&
thattr
);
httpTrace
(
"http thread:%p:%s, initialized"
,
pThread
,
pThread
->
label
);
pThread
++
;
}
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pServer
->
thread
),
&
thattr
,
(
void
*
)
httpAcceptHttpConnection
,
(
void
*
)(
pServer
))
!=
0
)
{
httpError
(
"http server:%s, failed to create Http accept thread, reason:%s"
,
pServer
->
label
,
strerror
(
errno
));
return
false
;
}
pthread_attr_destroy
(
&
thattr
);
httpTrace
(
"http server:%s, initialized, ip:%s:%u, numOfThreads:%d"
,
pServer
->
label
,
pServer
->
serverIp
,
...
...
src/plugins/http/src/httpSystem.c
浏览文件 @
0af376f7
...
...
@@ -54,7 +54,7 @@ static HttpServer *httpServer = NULL;
void
taosInitNote
(
int
numOfNoteLines
,
int
maxNotes
,
char
*
lable
);
int
httpInitSystem
()
{
taos_init
();
//
taos_init();
httpServer
=
(
HttpServer
*
)
malloc
(
sizeof
(
HttpServer
));
memset
(
httpServer
,
0
,
sizeof
(
HttpServer
));
...
...
src/vnode/main/src/vnodeMain.c
浏览文件 @
0af376f7
...
...
@@ -160,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo
.
writeToCache
=
vnodeWriteToQueue
;
syncInfo
.
confirmForward
=
dnodeSendRpcWriteRsp
;
syncInfo
.
notifyRole
=
vnodeNotifyRole
;
pVnode
->
sync
=
syncStart
(
&
syncInfo
);
;
pVnode
->
sync
=
syncStart
(
&
syncInfo
);
pVnode
->
events
=
NULL
;
pVnode
->
cq
=
NULL
;
...
...
tests/script/tmp/
dnode2
.sim
→
tests/script/tmp/
mnodes
.sim
浏览文件 @
0af376f7
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode2 -s start
sql connect
\ No newline at end of file
system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3
system sh/cfg.sh -n dnode1 -c numOfMPeers -v 3
system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录