Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
053280da
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
053280da
编写于
3月 18, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10] add dnode status message
上级
2a3c43ab
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
534 addition
and
232 deletion
+534
-232
src/dnode/inc/dnodeMClient.h
src/dnode/inc/dnodeMClient.h
+4
-2
src/dnode/inc/dnodeMgmt.h
src/dnode/inc/dnodeMgmt.h
+1
-0
src/dnode/inc/dnodeModule.h
src/dnode/inc/dnodeModule.h
+1
-0
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+122
-9
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+153
-81
src/dnode/src/dnodeMnode.c
src/dnode/src/dnodeMnode.c
+4
-3
src/dnode/src/dnodeModule.c
src/dnode/src/dnodeModule.c
+34
-1
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+1
-1
src/inc/mnode.h
src/inc/mnode.h
+1
-0
src/inc/taosdef.h
src/inc/taosdef.h
+1
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+12
-15
src/mnode/inc/mgmtVgroup.h
src/mnode/inc/mgmtVgroup.h
+3
-3
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+1
-59
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+145
-17
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+2
-1
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+1
-1
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+37
-18
src/util/inc/ihash.h
src/util/inc/ihash.h
+1
-1
src/util/src/ihash.c
src/util/src/ihash.c
+3
-12
src/util/src/tstatus.c
src/util/src/tstatus.c
+7
-7
src/vnode/tsdb/inc/tsdb.h
src/vnode/tsdb/inc/tsdb.h
+0
-1
未找到文件。
src/dnode/inc/dnodeMClient.h
浏览文件 @
053280da
...
...
@@ -20,8 +20,10 @@
extern
"C"
{
#endif
int32_t
dnodeInitMClient
();
void
dnodeCleanupMClient
();
int32_t
dnodeInitMClient
();
void
dnodeCleanupMClient
();
void
dnodeSendMsgToMnode
(
SRpcMsg
*
rpcMsg
);
uint32_t
dnodeGetMnodeMasteIp
();
#ifdef __cplusplus
}
...
...
src/dnode/inc/dnodeMgmt.h
浏览文件 @
053280da
...
...
@@ -23,6 +23,7 @@ extern "C" {
int32_t
dnodeInitMgmt
();
void
dnodeCleanupMgmt
();
void
dnodeMgmt
(
SRpcMsg
*
rpcMsg
);
void
dnodeUpdateDnodeId
(
int32_t
dnodeId
);
void
*
dnodeGetVnode
(
int32_t
vgId
);
int32_t
dnodeGetVnodeStatus
(
void
*
pVnode
);
...
...
src/dnode/inc/dnodeModule.h
浏览文件 @
053280da
...
...
@@ -23,6 +23,7 @@ extern "C" {
int32_t
dnodeInitModules
();
void
dnodeCleanUpModules
();
void
dnodeStartModules
();
void
dnodeProcessModuleStatus
(
uint32_t
moduleStatus
);
#ifdef __cplusplus
}
...
...
src/dnode/src/dnodeMClient.c
浏览文件 @
053280da
...
...
@@ -18,16 +18,22 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tutil.h"
#include "dnode.h"
#include "dnodeMClient.h"
static
void
(
*
dnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
bool
dnodeReadMnodeIpList
();
static
void
dnodeSaveMnodeIpList
();
static
void
dnodeProcessRspFromMnode
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
);
static
void
*
tsDnodeMClientRpc
;
static
void
(
*
tsDnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
*
tsDnodeMClientRpc
=
NULL
;
static
SRpcIpSet
tsDnodeMnodeIpList
=
{
0
};
int32_t
dnodeInitMClient
()
{
dnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_DM_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
dnodeReadMnodeIpList
();
tsDnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_DM_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
...
...
@@ -35,9 +41,12 @@ int32_t dnodeInitMClient() {
rpcInit
.
label
=
"DND-MC"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dnodeProcessRspFromMnode
;
rpcInit
.
sessions
=
TSDB_SESSIONS_PER_DNODE
;
rpcInit
.
sessions
=
100
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
2000
;
rpcInit
.
user
=
"t"
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
secret
=
"secret"
;
tsDnodeMClientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
tsDnodeMClientRpc
==
NULL
)
{
...
...
@@ -53,18 +62,122 @@ void dnodeCleanupMClient() {
if
(
tsDnodeMClientRpc
)
{
rpcClose
(
tsDnodeMClientRpc
);
tsDnodeMClientRpc
=
NULL
;
dPrint
(
"mnode rpc client is closed"
);
}
}
static
void
dnodeProcessRspFromMnode
(
SRpcMsg
*
pMsg
)
{
if
(
d
nodeProcessMgmtRspFp
[
pMsg
->
msgType
])
{
(
*
d
nodeProcessMgmtRspFp
[
pMsg
->
msgType
])(
pMsg
);
if
(
tsD
nodeProcessMgmtRspFp
[
pMsg
->
msgType
])
{
(
*
tsD
nodeProcessMgmtRspFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
dError
(
"%s is not processed in mclient"
,
taosMsg
[
pMsg
->
msgType
]);
dError
(
"%s is not processed in m
node rpc
client"
,
taosMsg
[
pMsg
->
msgType
]);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"status rsp is received, reason:%s"
,
tstrerror
(
pMsg
->
code
));
return
;
}
SDMStatusRsp
*
pStatusRsp
=
pMsg
->
pCont
;
if
(
pStatusRsp
->
ipList
.
numOfIps
<=
0
)
{
dError
(
"status msg is invalid, num of ips is %d"
,
pStatusRsp
->
ipList
.
numOfIps
);
return
;
}
pStatusRsp
->
ipList
.
port
=
htons
(
pStatusRsp
->
ipList
.
port
);
for
(
int32_t
i
=
0
;
i
<
pStatusRsp
->
ipList
.
numOfIps
;
++
i
)
{
pStatusRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pStatusRsp
->
ipList
.
ip
[
i
]);
}
dTrace
(
"status msg is received, result:%d"
,
tstrerror
(
pMsg
->
code
));
if
(
memcmp
(
&
(
pStatusRsp
->
ipList
),
&
tsDnodeMnodeIpList
,
sizeof
(
SRpcIpSet
))
!=
0
)
{
dPrint
(
"mnode ip list is changed, numOfIps:%d inUse:%d"
,
pStatusRsp
->
ipList
.
numOfIps
,
pStatusRsp
->
ipList
.
inUse
);
memcpy
(
&
tsDnodeMnodeIpList
,
&
pStatusRsp
->
ipList
,
sizeof
(
SRpcIpSet
));
for
(
int32_t
i
=
0
;
i
<
tsDnodeMnodeIpList
.
numOfIps
;
++
i
)
{
dPrint
(
"mnode IP index:%d ip:%s"
,
i
,
taosIpStr
(
tsDnodeMnodeIpList
.
ip
[
i
]));
}
dnodeSaveMnodeIpList
();
}
SDnodeState
*
pState
=
&
pStatusRsp
->
dnodeState
;
pState
->
numOfVnodes
=
htonl
(
pState
->
numOfVnodes
);
pState
->
moduleStatus
=
htonl
(
pState
->
moduleStatus
);
pState
->
createdTime
=
htonl
(
pState
->
createdTime
);
pState
->
dnodeId
=
htonl
(
pState
->
dnodeId
);
dnodeProcessModuleStatus
(
pState
->
moduleStatus
);
dnodeUpdateDnodeId
(
pState
->
dnodeId
);
}
void
dnodeSendMsgToMnode
(
SRpcMsg
*
rpcMsg
)
{
rpcSendRequest
(
tsDnodeMClientRpc
,
&
tsDnodeMnodeIpList
,
rpcMsg
);
}
static
bool
dnodeReadMnodeIpList
()
{
char
ipFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
ipFile
,
"%s/iplist"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
ipFile
,
"r"
);
if
(
!
fp
)
return
false
;
char
option
[
32
]
=
{
0
};
int32_t
value
=
0
;
int32_t
num
=
0
;
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strcmp
(
option
,
"inUse"
)
!=
0
)
return
false
;
tsDnodeMnodeIpList
.
inUse
=
(
int8_t
)
value
;;
num
=
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strcmp
(
option
,
"numOfIps"
)
!=
0
)
return
false
;
tsDnodeMnodeIpList
.
numOfIps
=
(
int8_t
)
value
;
num
=
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strcmp
(
option
,
"port"
)
!=
0
)
return
false
;
tsDnodeMnodeIpList
.
port
=
(
uint16_t
)
value
;
for
(
int32_t
i
=
0
;
i
<
tsDnodeMnodeIpList
.
numOfIps
;
i
++
)
{
num
=
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strncmp
(
option
,
"ip"
,
2
)
!=
0
)
return
false
;
tsDnodeMnodeIpList
.
ip
[
i
]
=
(
uint32_t
)
value
;
}
fclose
(
fp
);
dPrint
(
"read mnode iplist successed"
);
for
(
int32_t
i
=
0
;
i
<
tsDnodeMnodeIpList
.
numOfIps
;
i
++
)
{
dPrint
(
"mnode IP index:%d ip:%s"
,
i
,
taosIpStr
(
tsDnodeMnodeIpList
.
ip
[
i
]));
}
return
true
;
}
static
void
dnodeSaveMnodeIpList
()
{
char
ipFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
ipFile
,
"%s/iplist"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
ipFile
,
"w"
);
if
(
!
fp
)
return
;
fprintf
(
fp
,
"inUse %d
\n
"
,
tsDnodeMnodeIpList
.
inUse
);
fprintf
(
fp
,
"numOfIps %d
\n
"
,
tsDnodeMnodeIpList
.
numOfIps
);
fprintf
(
fp
,
"port %u
\n
"
,
tsDnodeMnodeIpList
.
port
);
for
(
int32_t
i
=
0
;
i
<
tsDnodeMnodeIpList
.
numOfIps
;
i
++
)
{
fprintf
(
fp
,
"ip%d %u
\n
"
,
i
,
tsDnodeMnodeIpList
.
ip
[
i
]);
}
fclose
(
fp
);
dPrint
(
"save mnode iplist successed"
);
}
uint32_t
dnodeGetMnodeMasteIp
()
{
return
tsDnodeMnodeIpList
.
ip
[
0
];
}
\ No newline at end of file
src/dnode/src/dnodeMgmt.c
浏览文件 @
053280da
...
...
@@ -22,16 +22,18 @@
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
typedef
struct
{
int32_t
vgId
;
// global vnode group ID
int32_t
vnode
;
int32_t
status
;
// status: master, slave, notready, deleting
int32_t
refCount
;
// reference count
int8_t
dirty
;
int8_t
status
;
// status: master, slave, notready, deleting
int64_t
version
;
void
*
wworker
;
void
*
rworker
;
...
...
@@ -48,6 +50,7 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir);
static
void
dnodeCleanupVnode
(
SVnodeObj
*
pVnode
);
static
int32_t
dnodeCreateVnode
(
SMDCreateVnodeMsg
*
cfg
);
static
void
dnodeDropVnode
(
SVnodeObj
*
pVnode
);
static
void
dnodeDoDropVnode
(
SVnodeObj
*
pVnode
);
static
void
dnodeProcessCreateVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessDropVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
pMsg
);
...
...
@@ -55,17 +58,23 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static
void
dnodeProcessConfigDnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
(
*
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
pMsg
);
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
);
static
void
dnodeReadDnodeId
();
static
void
*
tsDnodeVnodesHash
=
NULL
;
static
void
*
tsDnodeTmr
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
void
*
tsDnodeVnodesHash
=
NULL
;
static
void
*
tsDnodeTmr
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
uint32_t
tsRebootTime
;
static
int32_t
tsDnodeId
=
0
;
static
char
tsDnodeName
[
TSDB_DNODE_NAME_LEN
];
int32_t
dnodeInitMgmt
()
{
dnodeReadDnodeId
();
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeProcessCreateVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeProcessDropVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
dnodeProcessAlterVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeProcessAlterStreamMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeProcessConfigDnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeProcessAlterStreamMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeProcessConfigDnodeMsg
;
tsDnodeVnodesHash
=
taosInitIntHash
(
TSDB_MAX_VNODES
,
sizeof
(
SVnodeObj
),
taosHashInt
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
...
...
@@ -73,6 +82,8 @@ int32_t dnodeInitMgmt() {
return
-
1
;
}
tsRebootTime
=
taosGetTimestampSec
();
tsDnodeTmr
=
taosTmrInit
(
100
,
200
,
60000
,
"DND-DM"
);
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"failed to init dnode timer"
);
...
...
@@ -89,8 +100,16 @@ void dnodeCleanupMgmt() {
tsStatusTimer
=
NULL
;
}
if
(
tsDnodeTmr
!=
NULL
)
{
taosTmrCleanUp
(
tsDnodeTmr
);
tsDnodeTmr
=
NULL
;
}
dnodeCleanupVnodes
();
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
tsDnodeVnodesHash
=
NULL
;
}
}
void
dnodeMgmt
(
SRpcMsg
*
pMsg
)
{
...
...
@@ -106,7 +125,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcSendResponse
(
&
rsp
);
}
rpcFreeCont
(
pMsg
->
pCont
);
// free the received message
rpcFreeCont
(
pMsg
->
pCont
);
}
void
*
dnodeGetVnode
(
int32_t
vgId
)
{
...
...
@@ -145,8 +164,13 @@ void *dnodeGetVnodeTsdb(void *pVnode) {
return
((
SVnodeObj
*
)
pVnode
)
->
tsdb
;
}
void
dnodeReleaseVnode
(
void
*
pVnode
)
{
atomic_sub_fetch_32
(
&
((
SVnodeObj
*
)
pVnode
)
->
refCount
,
1
);
void
dnodeReleaseVnode
(
void
*
pVnodeRaw
)
{
SVnodeObj
*
pVnode
=
pVnodeRaw
;
int32_t
count
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
count
==
0
&&
pVnode
->
dirty
)
{
dnodeDoDropVnode
(
pVnode
);
}
}
static
int32_t
dnodeOpenVnodes
()
{
...
...
@@ -194,11 +218,10 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
return
terrno
;
}
//
STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb);
STsdbRepoInfo
*
tsdbInfo
=
tsdbGetStatus
(
pTsdb
);
SVnodeObj
vnodeObj
;
vnodeObj
.
vgId
=
vnode
;
//tsdbInfo->tsdbCfg.vgId;
vnodeObj
.
vnode
=
vnode
;
//tsdbInfo->tsdbCfg.tsdbId;
SVnodeObj
vnodeObj
=
{
0
};
vnodeObj
.
vgId
=
tsdbInfo
->
tsdbCfg
.
tsdbId
;
vnodeObj
.
status
=
TSDB_VN_STATUS_NOT_READY
;
vnodeObj
.
refCount
=
1
;
vnodeObj
.
version
=
0
;
...
...
@@ -212,7 +235,7 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
taosAddIntHash
(
tsDnodeVnodesHash
,
vnodeObj
.
vgId
,
(
char
*
)
(
&
vnodeObj
));
dTrace
(
"open vnode:%d in %s"
,
vnodeObj
.
v
node
,
rootDir
);
dTrace
(
"open vnode:%d in %s"
,
vnodeObj
.
v
gId
,
rootDir
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -241,14 +264,13 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode
->
tsdb
=
NULL
;
}
dTrace
(
"cleanup vnode:%d"
,
pVnode
->
v
node
);
dTrace
(
"cleanup vnode:%d"
,
pVnode
->
v
gId
);
}
static
int32_t
dnodeCreateVnode
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
STsdbCfg
tsdbCfg
=
{
0
};
tsdbCfg
.
vgId
=
pVnodeCfg
->
cfg
.
vgId
;
tsdbCfg
.
precision
=
pVnodeCfg
->
cfg
.
precision
;
tsdbCfg
.
tsdbId
=
pVnodeCfg
->
vnode
;
tsdbCfg
.
tsdbId
=
pVnodeCfg
->
cfg
.
vgId
;
tsdbCfg
.
maxTables
=
pVnodeCfg
->
cfg
.
maxSessions
;
tsdbCfg
.
daysPerFile
=
pVnodeCfg
->
cfg
.
daysPerFile
;
tsdbCfg
.
minRowsPerFileBlock
=
-
1
;
...
...
@@ -283,13 +305,12 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
void
*
pTsdb
=
tsdbCreateRepo
(
rootDir
,
&
tsdbCfg
,
NULL
);
if
(
pTsdb
==
NULL
)
{
dError
(
"vgroup:%d, failed to create tsdb in vnode
:%d, reason:%s"
,
pVnodeCfg
->
cfg
.
vgId
,
pVnodeCfg
->
vnode
,
tstrerror
(
terrno
));
dError
(
"vgroup:%d, failed to create tsdb in vnode
, reason:%s"
,
pVnodeCfg
->
cfg
.
vgId
,
tstrerror
(
terrno
));
return
terrno
;
}
SVnodeObj
vnodeObj
;
SVnodeObj
vnodeObj
=
{
0
}
;
vnodeObj
.
vgId
=
pVnodeCfg
->
cfg
.
vgId
;
vnodeObj
.
vnode
=
pVnodeCfg
->
vnode
;
vnodeObj
.
status
=
TSDB_VN_STATUS_NOT_READY
;
vnodeObj
.
refCount
=
1
;
vnodeObj
.
version
=
0
;
...
...
@@ -303,18 +324,11 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
taosAddIntHash
(
tsDnodeVnodesHash
,
vnodeObj
.
vgId
,
(
char
*
)
(
&
vnodeObj
));
dPrint
(
"vgroup:%d, vnode
:%d is created"
,
pVnodeCfg
->
cfg
.
vgId
,
pVnodeCfg
->
vnode
);
dPrint
(
"vgroup:%d, vnode
is created"
,
vnodeObj
.
vgId
);
return
TSDB_CODE_SUCCESS
;
}
static
void
dnodeDropVnode
(
SVnodeObj
*
pVnode
)
{
pVnode
->
status
=
TSDB_VN_STATUS_NOT_READY
;
int32_t
count
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
count
>
0
)
{
// wait refcount
}
static
void
dnodeDoDropVnode
(
SVnodeObj
*
pVnode
)
{
if
(
pVnode
->
tsdb
)
{
tsdbDropRepo
(
pVnode
->
tsdb
);
pVnode
->
tsdb
=
NULL
;
...
...
@@ -324,21 +338,33 @@ static void dnodeDropVnode(SVnodeObj *pVnode) {
taosDeleteIntHash
(
tsDnodeVnodesHash
,
pVnode
->
vgId
);
}
static
void
dnodeDropVnode
(
SVnodeObj
*
pVnode
)
{
pVnode
->
status
=
TSDB_VN_STATUS_NOT_READY
;
pVnode
->
dirty
=
true
;
int32_t
count
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
count
>
0
)
{
dTrace
(
"vgroup:%d, vnode will be dropped until refcount:%d is 0"
,
pVnode
->
vgId
,
count
);
return
;
}
dnodeDoDropVnode
(
pVnode
);
}
static
void
dnodeProcessCreateVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SMDCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
pCreate
->
vnode
=
htonl
(
pCreate
->
vnode
);
pCreate
->
cfg
.
vgId
=
htonl
(
pCreate
->
cfg
.
vgId
);
pCreate
->
cfg
.
maxSessions
=
htonl
(
pCreate
->
cfg
.
maxSessions
);
pCreate
->
cfg
.
daysPerFile
=
htonl
(
pCreate
->
cfg
.
daysPerFile
);
dTrace
(
"vgroup:%d, start to create vnode
:%d"
,
pCreate
->
cfg
.
vgId
,
pCreate
->
vnode
);
dTrace
(
"vgroup:%d, start to create vnode
in dnode"
,
pCreate
->
cfg
.
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
dPrint
(
"vgroup:%d, vnode
:%d is already exist"
,
pCreate
->
cfg
.
vgId
,
pCreate
->
vnode
);
dPrint
(
"vgroup:%d, vnode
is already exist"
,
pCreate
->
cfg
.
vgId
);
}
else
{
rpcRsp
.
code
=
dnodeCreateVnode
(
pCreate
);
}
...
...
@@ -352,6 +378,8 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SMDDropVnodeMsg
*
pDrop
=
rpcMsg
->
pCont
;
pDrop
->
vgId
=
htonl
(
pDrop
->
vgId
);
dTrace
(
"vgroup:%d, start to drop vnode in dnode"
,
pDrop
->
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pDrop
->
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
dnodeDropVnode
(
pVnodeObj
);
...
...
@@ -367,11 +395,12 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SMDCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
pCreate
->
vnode
=
htonl
(
pCreate
->
vnode
);
pCreate
->
cfg
.
vgId
=
htonl
(
pCreate
->
cfg
.
vgId
);
pCreate
->
cfg
.
maxSessions
=
htonl
(
pCreate
->
cfg
.
maxSessions
);
pCreate
->
cfg
.
daysPerFile
=
htonl
(
pCreate
->
cfg
.
daysPerFile
);
dTrace
(
"vgroup:%d, start to alter vnode in dnode"
,
pCreate
->
cfg
.
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -401,60 +430,103 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
void
dnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
pNode
;
if
(
pVnode
->
dirty
)
return
;
SDMStatusMsg
*
pStatus
=
param
;
if
(
pStatus
->
openVnodes
>=
TSDB_MAX_VNODES
)
return
;
SVnodeLoad
*
pLoad
=
&
pStatus
->
load
[
pStatus
->
openVnodes
++
];
pLoad
->
vgId
=
htonl
(
pVnode
->
vgId
);
pLoad
->
status
=
pVnode
->
status
;
}
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
)
{
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"dnode timer is already released"
);
return
;
}
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
if
(
tsStatusTimer
==
NULL
)
{
dError
(
"failed to start status timer"
);
return
;
}
// int32_t contLen = sizeof(SDMStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad);
// SDMStatusMsg *pStatus = rpcMallocCont(contLen);
// if (pStatus == NULL) {
// dError("Failed to malloc status message");
// return;
// }
//
// int32_t totalVnodes = dnodeGetVnodesNum();
//
// pStatus->version = htonl(tsVersion);
// pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
// pStatus->publicIp = htonl(inet_addr(tsPublicIp));
// pStatus->lastReboot = htonl(tsRebootTime);
// pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
// pStatus->openVnodes = htons((uint16_t) totalVnodes);
// pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
// pStatus->diskAvailable = tsAvailDataDirGB;
// pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
//
// SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load;
//TODO loop all vnodes
// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) {
// if (vnodeList[vnode].cfg.maxSessions <= 0) continue;
//
// SVnodeObj *pVnode = vnodeList + vnode;
// pLoad->vnode = htonl(vnode);
// pLoad->vgId = htonl(pVnode->cfg.vgId);
// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus;
// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus;
// pLoad->accessState = (uint8_t)(pVnode->accessState);
// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage);
// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage);
// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) {
// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten);
// } else {
// pLoad->pointsWritten = htobe64(0);
// }
// pLoad++;
//
// if (++count >= tsOpenVnodes) {
// break;
// }
// }
// dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen);
int32_t
contLen
=
sizeof
(
SDMStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
SDMStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
if
(
pStatus
==
NULL
)
{
dError
(
"failed to malloc status message"
);
return
;
}
strcpy
(
pStatus
->
dnodeName
,
tsDnodeName
);
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
dnodeId
=
htonl
(
tsDnodeId
);
pStatus
->
privateIp
=
htonl
(
inet_addr
(
tsPrivateIp
));
pStatus
->
publicIp
=
htonl
(
inet_addr
(
tsPublicIp
));
pStatus
->
lastReboot
=
htonl
(
tsRebootTime
);
pStatus
->
numOfTotalVnodes
=
htons
((
uint16_t
)
tsNumOfTotalVnodes
);
pStatus
->
numOfCores
=
htons
((
uint16_t
)
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
(
uint8_t
)
tsAlternativeRole
;
taosVisitIntHashWithFp
(
tsDnodeVnodesHash
,
dnodeBuildVloadMsg
,
pStatus
);
contLen
=
sizeof
(
SDMStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
SRpcMsg
rpcMsg
=
{
.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode
(
&
rpcMsg
);
}
static
void
dnodeReadDnodeId
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"r"
);
if
(
!
fp
)
return
;
char
option
[
32
]
=
{
0
};
int32_t
value
=
0
;
int32_t
num
=
0
;
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strcmp
(
option
,
"dnodeId"
)
!=
0
)
return
false
;
tsDnodeId
=
value
;;
fclose
(
fp
);
dPrint
(
"read dnodeId:%d successed"
,
tsDnodeId
);
}
static
void
dnodeSaveDnodeId
()
{
char
dnodeIdFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"w"
);
if
(
!
fp
)
{
return
false
;
}
fprintf
(
fp
,
"dnodeId %d
\n
"
,
tsDnodeId
);
fclose
(
fp
);
dPrint
(
"save dnodeId successed"
);
return
true
;
}
void
dnodeUpdateDnodeId
(
int32_t
dnodeId
)
{
if
(
tsDnodeId
==
0
)
{
dPrint
(
"dnodeId is set to %d"
,
dnodeId
);
tsDnodeId
=
dnodeId
;
dnodeSaveDnodeId
();
}
}
src/dnode/src/dnodeMnode.c
浏览文件 @
053280da
...
...
@@ -40,12 +40,12 @@ int32_t dnodeInitMnode() {
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
rpcInit
.
localPort
=
tsDnodeMnodePort
;
rpcInit
.
label
=
"DND-
mnode
"
;
rpcInit
.
label
=
"DND-
MS
"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dnodeProcessMsgFromMnode
;
rpcInit
.
sessions
=
TSDB_SESSIONS_PER_DNODE
;
rpcInit
.
sessions
=
100
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1
000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
2
000
;
tsDnodeMnodeRpc
=
rpcOpen
(
&
rpcInit
);
if
(
tsDnodeMnodeRpc
==
NULL
)
{
...
...
@@ -61,6 +61,7 @@ void dnodeCleanupMnode() {
if
(
tsDnodeMnodeRpc
)
{
rpcClose
(
tsDnodeMnodeRpc
);
tsDnodeMnodeRpc
=
NULL
;
dPrint
(
"mnode rpc server is closed"
);
}
}
...
...
src/dnode/src/dnodeModule.c
浏览文件 @
053280da
...
...
@@ -74,7 +74,7 @@ int32_t dnodeInitModules() {
for
(
int
mod
=
0
;
mod
<
TSDB_MOD_MAX
;
++
mod
)
{
if
(
tsModule
[
mod
].
num
!=
0
&&
tsModule
[
mod
].
initFp
)
{
if
((
*
tsModule
[
mod
].
initFp
)()
!=
0
)
{
dError
(
"
TDengine initialization failed
"
);
dError
(
"
failed to init modules
"
);
return
-
1
;
}
}
...
...
@@ -92,3 +92,36 @@ void dnodeStartModules() {
}
}
}
void
dnodeProcessModuleStatus
(
uint32_t
moduleStatus
)
{
if
(
moduleStatus
==
tsModuleStatus
)
return
;
dPrint
(
"module status is received, old:%d, new:%d"
,
tsModuleStatus
,
moduleStatus
);
int
news
=
moduleStatus
;
int
olds
=
tsModuleStatus
;
for
(
int
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
int
newStatus
=
news
&
(
1
<<
moduleType
);
int
oldStatus
=
olds
&
(
1
<<
moduleType
);
if
(
oldStatus
>
0
)
{
if
(
newStatus
==
0
)
{
if
(
tsModule
[
moduleType
].
stopFp
)
{
dPrint
(
"module:%s is stopped on this node"
,
tsModule
[
moduleType
].
name
);
(
*
tsModule
[
moduleType
].
stopFp
)();
}
}
}
else
if
(
oldStatus
==
0
)
{
if
(
newStatus
>
0
)
{
if
(
tsModule
[
moduleType
].
startFp
)
{
dPrint
(
"module:%s is started on this node"
,
tsModule
[
moduleType
].
name
);
(
*
tsModule
[
moduleType
].
startFp
)();
}
}
}
else
{
}
}
tsModuleStatus
=
moduleStatus
;
}
src/dnode/src/dnodeWrite.c
浏览文件 @
053280da
...
...
@@ -94,7 +94,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
SRpcContext
*
pRpcContext
=
NULL
;
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
||
pMsg
->
msgType
==
TSDB_MSG_TYPE_MD_DROP_STABLE
)
{
SMsgDesc
*
pDesc
=
pCont
;
SMsgDesc
*
pDesc
=
(
SMsgDesc
*
)
pCont
;
pDesc
->
numOfVnodes
=
htonl
(
pDesc
->
numOfVnodes
);
pCont
+=
sizeof
(
SMsgDesc
);
if
(
pDesc
->
numOfVnodes
>
1
)
{
...
...
src/inc/mnode.h
浏览文件 @
053280da
...
...
@@ -40,6 +40,7 @@ extern "C" {
#include "tutil.h"
typedef
struct
{
int32_t
dnodeId
;
uint32_t
privateIp
;
int32_t
sid
;
uint32_t
moduleStatus
;
...
...
src/inc/taosdef.h
浏览文件 @
053280da
...
...
@@ -165,6 +165,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_COLUMNS 256
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_DNODE_NAME_LEN 63
#define TSDB_TABLE_NAME_LEN 64
#define TSDB_DB_NAME_LEN 32
#define TSDB_COL_NAME_LEN 64
...
...
src/inc/taosmsg.h
浏览文件 @
053280da
...
...
@@ -25,7 +25,6 @@ extern "C" {
#include "taosdef.h"
#include "taoserror.h"
#include "taosdef.h"
#include "trpc.h"
// message type
...
...
@@ -520,16 +519,14 @@ typedef struct {
}
SRetrieveTableRsp
;
typedef
struct
{
uint32_t
vnode
;
uint32_t
vgId
;
uint8_t
status
;
uint8_t
dropStatus
;
uint8_t
accessState
;
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
uint8_t
syncStatus
;
uint8_t
reserved
[
15
];
int32_t
vgId
;
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
uint8_t
status
;
uint8_t
syncStatus
;
uint8_t
accessState
;
uint8_t
reserved
[
6
];
}
SVnodeLoad
;
typedef
struct
{
...
...
@@ -582,14 +579,16 @@ typedef struct {
}
SVnodeStatisticInfo
;
typedef
struct
{
int32_t
dnodeId
;
uint32_t
moduleStatus
;
uint32_t
createdTime
;
uint32_t
numOfVnodes
;
uint32_t
reserved
;
}
SDnodeState
;
typedef
struct
{
uint32_t
version
;
int32_t
dnodeId
;
char
dnodeName
[
TSDB_DNODE_NAME_LEN
];
uint32_t
privateIp
;
uint32_t
publicIp
;
uint32_t
lastReboot
;
// time stamp for last reboot
...
...
@@ -603,14 +602,12 @@ typedef struct {
}
SDMStatusMsg
;
typedef
struct
{
int32_t
code
;
SDnodeState
dnodeState
;
SRpcIpSet
ipList
;
SDnodeState
dnodeState
;
SVnodeAccess
vnodeAccess
[];
}
SDMStatusRsp
;
typedef
struct
{
int32_t
vnode
;
SVnodeCfg
cfg
;
SVnodeDesc
vpeerDesc
[
TSDB_MAX_MPEERS
];
}
SMDCreateVnodeMsg
;
...
...
src/mnode/inc/mgmtVgroup.h
浏览文件 @
053280da
...
...
@@ -32,15 +32,15 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void
mgmtCreateVgroup
(
SQueuedMsg
*
pMsg
);
void
mgmtDropVgroup
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
void
mgmtUpdateVgroup
(
SVgObj
*
pVgroup
);
void
mgmtUpdateVgroupIp
(
SDnodeObj
*
pDnode
);
void
mgmtSetVgroupIdPool
();
SVgObj
*
mgmtGetAvailableVgroup
(
SDbObj
*
pDb
);
void
mgmtAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
STableInfo
*
pTable
);
void
mgmtRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
STableInfo
*
pTable
);
SMDCreateVnodeMsg
*
mgmtBuildCreateVnodeMsg
(
SVgObj
*
pVgroup
,
int32_t
vnode
);
void
mgmtSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
int32_t
vnode
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
);
void
mgmtSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
);
void
mgmtSendDropVnodeMsg
(
int32_t
vgId
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
);
SRpcIpSet
mgmtGetIpSetFromVgroup
(
SVgObj
*
pVgroup
);
SRpcIpSet
mgmtGetIpSetFromIp
(
uint32_t
ip
);
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
053280da
...
...
@@ -299,62 +299,4 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// mgmtCleanUpDnodeIntFp();
// }
//}
//
//void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// SDMStatusMsg *pStatus = (SDMStatusMsg *)pCont;
//
// SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp));
// if (pObj == NULL) {
// mError("dnode:%s not exist", taosIpStr(pObj->privateIp));
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0);
// return;
// }
//
// pObj->lastReboot = htonl(pStatus->lastReboot);
// pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
// pObj->openVnodes = htons(pStatus->openVnodes);
// pObj->numOfCores = htons(pStatus->numOfCores);
// pObj->diskAvailable = pStatus->diskAvailable;
// pObj->alternativeRole = pStatus->alternativeRole;
////
//// if (mgmtProcessDnodeStatusFp) {
//// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn);
//// return;
//// }
//
// pObj->status = TSDB_DN_STATUS_READY;
//
//// // wait vnode dropped
//// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) {
//// SVnodeLoad *pVload = &(pObj->vload[vnode]);
//// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
//// bool existInDnode = false;
//// for (int32_t j = 0; j < pObj->openVnodes; ++j) {
//// if (htonl(pStatus->load[j].vnode) == vnode) {
//// existInDnode = true;
//// break;
//// }
//// }
////
//// if (!existInDnode) {
//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
//// pVload->status = TSDB_VN_STATUS_OFFLINE;
//// mgmtUpdateDnode(pObj);
//// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
//// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
//// }
//// } else if (pVload->vgId == 0) {
//// /*
//// * In some cases, vnode information may be reported abnormally, recover it
//// */
//// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) {
//// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status",
//// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status),
//// taosGetVnodeDropStatusStr(pVload->dropStatus));
//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
//// pVload->status = TSDB_VN_STATUS_OFFLINE;
//// mgmtUpdateDnode(pObj);
//// }
//// }
//// }
//}
//
\ No newline at end of file
src/mnode/src/mgmtDnode.c
浏览文件 @
053280da
...
...
@@ -23,12 +23,14 @@
#include "mgmtDClient.h"
#include "mgmtMnode.h"
#include "mgmtShell.h"
#include "mgmtDServer.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
int32_t
(
*
mgmtInitDnodesFp
)()
=
NULL
;
void
(
*
mgmtCleanUpDnodesFp
)()
=
NULL
;
SDnodeObj
*
(
*
mgmtGetDnodeFp
)(
uint32_t
ip
)
=
NULL
;
SDnodeObj
*
(
*
mgmtGetDnodeByIpFp
)(
int32_t
dnodeId
)
=
NULL
;
int32_t
(
*
mgmtGetDnodesNumFp
)()
=
NULL
;
int32_t
(
*
mgmtUpdateDnodeFp
)(
SDnodeObj
*
pDnode
)
=
NULL
;
void
*
(
*
mgmtGetNextDnodeFp
)(
SShowObj
*
pShow
,
SDnodeObj
**
pDnode
)
=
NULL
;
...
...
@@ -45,6 +47,7 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn)
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
mgmtProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
void
mgmtSetDnodeMaxVnodes
(
SDnodeObj
*
pDnode
)
{
int32_t
maxVnodes
=
pDnode
->
numOfCores
*
tsNumOfVnodesPerCore
;
...
...
@@ -70,10 +73,9 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
for
(
int32_t
i
=
0
;
i
<
pDnode
->
numOfVnodes
;
++
i
)
{
SVnodeLoad
*
pVload
=
pDnode
->
vload
+
i
;
if
(
pVload
->
vgId
!=
0
)
{
mTrace
(
"
%d-dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate
:%d %s, syncstatus:%d %s"
,
totalVnodes
,
taosIpStr
(
pDnode
->
privateIp
),
i
,
pVload
->
vgId
,
mTrace
(
"
dnode:%d, calc free vnodes, vnode:%d, status
:%d %s, syncstatus:%d %s"
,
pDnode
->
dnodeId
,
pVload
->
vgId
,
pVload
->
status
,
taosGetVnodeStatusStr
(
pVload
->
status
),
pVload
->
dropStatus
,
taosGetVnodeDropStatusStr
(
pVload
->
dropStatus
),
pVload
->
syncStatus
,
taosGetVnodeSyncStatusStr
(
pVload
->
syncStatus
));
totalVnodes
++
;
}
...
...
@@ -92,7 +94,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) {
if
(
pDnode
)
{
SVnodeLoad
*
pVload
=
pDnode
->
vload
+
vnodeGid
[
i
].
vnode
;
memset
(
pVload
,
0
,
sizeof
(
SVnodeLoad
));
pVload
->
vnode
=
vnodeGid
[
i
].
vnode
;
//
pVload->vnode = vnodeGid[i].vnode;
pVload
->
vgId
=
vgId
;
mTrace
(
"dnode:%s, vnode:%d add to vgroup:%d"
,
taosIpStr
(
pDnode
->
privateIp
),
vnodeGid
[
i
].
vnode
,
pVload
->
vgId
);
mgmtCalcNumOfFreeVnodes
(
pDnode
);
...
...
@@ -118,7 +120,6 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) {
}
}
bool
mgmtCheckModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
uint32_t
status
=
pDnode
->
moduleStatus
&
(
1
<<
moduleType
);
return
status
>
0
;
...
...
@@ -319,12 +320,6 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn)
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"vgid"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
12
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"status"
);
...
...
@@ -400,10 +395,6 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
uint32_t
*
)
pWrite
=
pVnode
->
vnode
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
uint32_t
*
)
pWrite
=
pVnode
->
vgId
;
cols
++
;
...
...
@@ -437,11 +428,12 @@ int32_t mgmtInitDnodes() {
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtRetrieveVnodes
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
mgmtProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
mgmtProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
mgmtProcessDnodeStatusMsg
);
if
(
mgmtInitDnodesFp
)
{
return
mgmtInitDnodesFp
();
}
else
{
tsDnodeObj
.
dnodeId
=
1
;
tsDnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);;
tsDnodeObj
.
createdTime
=
taosGetTimestampMs
();
tsDnodeObj
.
lastReboot
=
taosGetTimestampSec
();
...
...
@@ -478,6 +470,16 @@ SDnodeObj *mgmtGetDnode(uint32_t ip) {
}
}
SDnodeObj
*
mgmtGetDnodeByIp
(
int32_t
dnodeId
)
{
if
(
mgmtGetDnodeByIpFp
)
{
return
mgmtGetDnodeByIpFp
(
dnodeId
);
}
if
(
dnodeId
!=
0
)
{
return
&
tsDnodeObj
;
}
return
NULL
;
}
int32_t
mgmtGetDnodesNum
()
{
if
(
mgmtGetDnodesNumFp
)
{
return
mgmtGetDnodesNumFp
();
...
...
@@ -569,4 +571,130 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
static
void
mgmtProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"cfg vnode rsp is received"
);
}
\ No newline at end of file
}
void
mgmtProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
)
{
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SDMStatusMsg
*
pStatus
=
rpcMsg
->
pCont
;
pStatus
->
dnodeId
=
htonl
(
pStatus
->
dnodeId
);
SDnodeObj
*
pDnode
=
NULL
;
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
mgmtGetDnodeByIp
(
pStatus
->
privateIp
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode not created in cluster, privateIp:%s, name:%s, "
,
taosIpStr
(
htonl
(
pStatus
->
dnodeId
)),
pStatus
->
dnodeName
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
return
;
}
}
uint32_t
version
=
htonl
(
pStatus
->
version
);
if
(
version
!=
tsVersion
)
{
mError
(
"dnode:%d, status msg version:%d not equal with mnode:%d"
,
pDnode
->
dnodeId
,
version
,
tsVersion
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_INVALID_MSG_VERSION
);
return
;
}
uint32_t
lastPrivateIp
=
htonl
(
pDnode
->
privateIp
);
uint32_t
lastPublicIp
=
htonl
(
pDnode
->
publicIp
);
pDnode
->
privateIp
=
htonl
(
pStatus
->
privateIp
);
pDnode
->
publicIp
=
htonl
(
pStatus
->
publicIp
);
pDnode
->
lastReboot
=
htonl
(
pStatus
->
lastReboot
);
pDnode
->
numOfTotalVnodes
=
htons
(
pStatus
->
numOfTotalVnodes
);
pDnode
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
pDnode
->
numOfCores
=
htons
(
pStatus
->
numOfCores
);
pDnode
->
diskAvailable
=
pStatus
->
diskAvailable
;
pDnode
->
alternativeRole
=
pStatus
->
alternativeRole
;
if
(
pStatus
->
dnodeId
==
0
)
{
mTrace
(
"dnode:%d, first access, privateIp:%s, name:%s, "
,
pDnode
->
dnodeId
,
taosIpStr
(
pStatus
->
dnodeId
),
pStatus
->
dnodeName
);
mgmtSetDnodeMaxVnodes
(
pDnode
);
mgmtUpdateDnode
(
pDnode
);
}
if
(
lastPrivateIp
!=
pDnode
->
privateIp
||
lastPublicIp
!=
pDnode
->
publicIp
)
{
mgmtUpdateVgroupIp
(
pDnode
);
//mgmtUpdateMnodeIp();
}
for
(
int32_t
j
=
0
;
j
<
pDnode
->
openVnodes
;
++
j
)
{
pStatus
->
load
[
j
].
vgId
=
htonl
(
pStatus
->
load
[
j
].
vgId
);
pStatus
->
load
[
j
].
totalStorage
=
htobe64
(
pStatus
->
load
[
j
].
totalStorage
);
pStatus
->
load
[
j
].
compStorage
=
htobe64
(
pStatus
->
load
[
j
].
compStorage
);
pStatus
->
load
[
j
].
pointsWritten
=
htobe64
(
pStatus
->
load
[
j
].
pointsWritten
);
bool
existInMnode
=
false
;
for
(
int32_t
vnode
=
0
;
vnode
<
pDnode
->
numOfVnodes
;
++
vnode
)
{
SVnodeLoad
*
pVload
=
&
(
pDnode
->
vload
[
vnode
]);
if
(
pVload
->
vgId
==
pStatus
->
load
[
j
].
vgId
)
{
existInMnode
=
true
;
}
}
if
(
!
existInMnode
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
privateIp
);
mPrint
(
"dnode:%d, vnode:%d not exist in mnode, drop it"
,
pDnode
->
dnodeId
,
pStatus
->
load
[
j
].
vgId
);
mgmtSendDropVnodeMsg
(
pStatus
->
load
[
j
].
vgId
,
&
ipSet
,
NULL
);
}
}
for
(
int32_t
vnode
=
0
;
vnode
<
pDnode
->
numOfVnodes
;
++
vnode
)
{
SVnodeLoad
*
pVload
=
&
(
pDnode
->
vload
[
vnode
]);
bool
existInDnode
=
false
;
for
(
int32_t
j
=
0
;
j
<
pDnode
->
openVnodes
;
++
j
)
{
if
(
htonl
(
pStatus
->
load
[
j
].
vgId
)
==
pVload
->
vgId
)
{
existInDnode
=
true
;
break
;
}
}
if
(
!
existInDnode
)
{
mPrint
(
"dnode:%d, vnode:%d not exist in dnode, create it"
,
pDnode
->
dnodeId
,
pVload
->
vgId
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pVload
->
vgId
);
if
(
pVgroup
!=
NULL
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
privateIp
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
NULL
);
}
}
}
if
(
pDnode
->
status
!=
TSDB_DN_STATUS_READY
)
{
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TSDB_DN_STATUS_READY
;
//TODO:
//mgmtStartBalanceTimer(200);
}
int32_t
contLen
=
sizeof
(
SDMStatusRsp
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeAccess
);
SDMStatusRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
pRsp
->
ipList
=
*
pSdbIpList
;
pRsp
->
ipList
.
port
=
htons
(
pRsp
->
ipList
.
port
);
for
(
int
i
=
0
;
i
<
pRsp
->
ipList
.
numOfIps
;
++
i
)
{
pRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pRsp
->
ipList
.
ip
[
i
]);
}
pRsp
->
dnodeState
.
dnodeId
=
htonl
(
pDnode
->
dnodeId
);
pRsp
->
dnodeState
.
moduleStatus
=
htonl
(
pDnode
->
moduleStatus
);
pRsp
->
dnodeState
.
createdTime
=
htonl
(
pDnode
->
createdTime
/
1000
);
pRsp
->
dnodeState
.
numOfVnodes
=
0
;
contLen
=
sizeof
(
SDMStatusRsp
);
//TODO: set vnode access
SRpcMsg
rpcRsp
=
{
.
code
=
TSDB_CODE_SUCCESS
,
.
pCont
=
pStatus
,
.
contLen
=
contLen
};
rpcSendResponse
(
&
rpcRsp
);
}
src/mnode/src/mgmtMnode.c
浏览文件 @
053280da
...
...
@@ -19,4 +19,5 @@
bool
mgmtCheckRedirect
(
void
*
handle
)
{
return
false
;
}
\ No newline at end of file
}
src/mnode/src/mgmtShell.c
浏览文件 @
053280da
...
...
@@ -392,7 +392,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
}
if
(
pConnectMsg
->
db
[
0
])
{
char
dbName
[
TSDB_TABLE_ID_LEN
]
=
{
0
};
char
dbName
[
TSDB_TABLE_ID_LEN
*
3
]
=
{
0
};
sprintf
(
dbName
,
"%x%s%s"
,
pAcct
->
acctId
,
TS_PATH_DELIMITER
,
pConnectMsg
->
db
);
SDbObj
*
pDb
=
mgmtGetDb
(
dbName
);
if
(
pDb
==
NULL
)
{
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
053280da
...
...
@@ -316,9 +316,9 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
}
SVnodeLoad
*
vload
=
pDnode
->
vload
+
pVnode
->
vnode
;
if
(
vload
->
vgId
!=
pVgroup
->
vgId
||
vload
->
vnode
!=
pVnode
->
vnode
)
{
mError
(
"dnode:%s, vgroup:%d,
vnode:%d not same with dnode vgroup:%d vnode
:%d"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
pVnode
->
vnode
,
vload
->
vgId
,
vload
->
vnode
);
if
(
vload
->
vgId
!=
pVgroup
->
vgId
)
{
mError
(
"dnode:%s, vgroup:%d,
not same with dnode vgroup
:%d"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
vload
->
vgId
);
return
"null"
;
}
...
...
@@ -489,15 +489,14 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
taosFreeId
(
pVgroup
->
idPool
,
pTable
->
sid
);
}
SMDCreateVnodeMsg
*
mgmtBuildCreateVnodeMsg
(
SVgObj
*
pVgroup
,
int32_t
vnode
)
{
SMDCreateVnodeMsg
*
mgmtBuildCreateVnodeMsg
(
SVgObj
*
pVgroup
)
{
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
return
NULL
;
SMDCreateVnodeMsg
*
pVnode
=
rpcMallocCont
(
sizeof
(
SMDCreateVnodeMsg
));
if
(
pVnode
==
NULL
)
return
NULL
;
pVnode
->
vnode
=
htonl
(
vnode
);
pVnode
->
cfg
=
pDb
->
cfg
;
pVnode
->
cfg
=
pDb
->
cfg
;
SVnodeCfg
*
pCfg
=
&
pVnode
->
cfg
;
pCfg
->
vgId
=
htonl
(
pVgroup
->
vgId
);
...
...
@@ -517,7 +516,6 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
for
(
int32_t
j
=
0
;
j
<
pVgroup
->
numOfVnodes
;
++
j
)
{
vpeerDesc
[
j
].
vgId
=
htonl
(
pVgroup
->
vgId
);
vpeerDesc
[
j
].
ip
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
ip
);
vpeerDesc
[
j
].
vnode
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
vnode
);
}
return
pVnode
;
...
...
@@ -559,9 +557,9 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
return
ipSet
;
}
void
mgmtSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
int32_t
vnode
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send create
vnode:%d msg, ahandle:%p"
,
pVgroup
->
vgId
,
vnode
,
ahandle
);
SMDCreateVnodeMsg
*
pCreate
=
mgmtBuildCreateVnodeMsg
(
pVgroup
,
vnode
);
void
mgmtSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send create
msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
SMDCreateVnodeMsg
*
pCreate
=
mgmtBuildCreateVnodeMsg
(
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
.
pCont
=
pCreate
,
...
...
@@ -576,7 +574,7 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace
(
"vgroup:%d, send create all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
ip
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
pVgroup
->
vnodeGid
[
i
].
vnode
,
&
ipSet
,
ahandle
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
ahandle
);
}
}
...
...
@@ -615,17 +613,17 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
free
(
queueMsg
);
}
static
SMDDropVnodeMsg
*
mgmtBuildDropVnodeMsg
(
SVgObj
*
pVgroup
)
{
static
SMDDropVnodeMsg
*
mgmtBuildDropVnodeMsg
(
int32_t
vgId
)
{
SMDDropVnodeMsg
*
pDrop
=
rpcMallocCont
(
sizeof
(
SMDDropVnodeMsg
));
if
(
pDrop
==
NULL
)
return
NULL
;
pDrop
->
vgId
=
htonl
(
pVgroup
->
vgId
);
pDrop
->
vgId
=
htonl
(
vgId
);
return
pDrop
;
}
static
void
mgmtSendDropVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send drop vnode msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
SMDDropVnodeMsg
*
pDrop
=
mgmtBuildDropVnodeMsg
(
pVgroup
);
void
mgmtSendDropVnodeMsg
(
int32_t
vgId
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send drop vnode msg, ahandle:%p"
,
vgId
,
ahandle
);
SMDDropVnodeMsg
*
pDrop
=
mgmtBuildDropVnodeMsg
(
vgId
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
.
pCont
=
pDrop
,
...
...
@@ -640,7 +638,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace
(
"vgroup:%d, send drop all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
ip
);
mgmtSendDropVnodeMsg
(
pVgroup
,
&
ipSet
,
ahandle
);
mgmtSendDropVnodeMsg
(
pVgroup
->
vgId
,
&
ipSet
,
ahandle
);
}
}
...
...
@@ -675,4 +673,25 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mgmtAddToShellQueue
(
newMsg
);
free
(
queueMsg
);
}
\ No newline at end of file
}
void
mgmtUpdateVgroupIp
(
SDnodeObj
*
pDnode
)
{
void
*
pNode
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SVnodeGid
*
vnodeGid
=
pVgroup
->
vnodeGid
+
i
;
if
(
vnodeGid
->
dnodeId
==
pDnode
->
dnodeId
)
{
mPrint
(
"vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s"
,
pVgroup
->
vgId
,
vnodeGid
->
dnodeId
,
pDnode
->
privateIp
,
taosIpStr
(
vnodeGid
->
ip
),
pDnode
->
publicIp
,
taosIpStr
(
vnodeGid
->
publicIp
));
vnodeGid
->
publicIp
=
pDnode
->
publicIp
;
vnodeGid
->
ip
=
pDnode
->
privateIp
;
sdbUpdateRow
(
tsVgroupSdb
,
pVgroup
,
tsVgUpdateSize
,
1
);
}
}
}
}
src/util/inc/ihash.h
浏览文件 @
053280da
...
...
@@ -36,7 +36,7 @@ int32_t taosHashInt(void *handle, uint64_t key);
void
taosCleanUpIntHashWithFp
(
void
*
handle
,
void
(
*
fp
)(
char
*
));
char
*
taosVisitIntHashWithFp
(
void
*
handle
,
int
(
*
fp
)(
char
*
)
);
void
taosVisitIntHashWithFp
(
void
*
handle
,
void
(
*
fp
)(
char
*
,
void
*
),
void
*
param
);
int32_t
taosGetIntHashSize
(
void
*
handle
);
...
...
src/util/src/ihash.c
浏览文件 @
053280da
...
...
@@ -187,7 +187,6 @@ void taosCleanUpIntHash(void *handle) {
free
(
pObj
);
}
void
taosCleanUpIntHashWithFp
(
void
*
handle
,
void
(
*
fp
)(
char
*
))
{
IHashObj
*
pObj
;
IHashNode
*
pNode
,
*
pNext
;
...
...
@@ -202,7 +201,7 @@ void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) {
pNode
=
pObj
->
hashList
[
i
];
while
(
pNode
)
{
pNext
=
pNode
->
next
;
if
(
fp
!=
NULL
)
fp
(
pNode
->
data
);
if
(
fp
!=
NULL
)
(
*
fp
)
(
pNode
->
data
);
free
(
pNode
);
pNode
=
pNext
;
}
...
...
@@ -219,7 +218,7 @@ void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) {
free
(
pObj
);
}
char
*
taosVisitIntHashWithFp
(
void
*
handle
,
int
(
*
fp
)(
char
*
)
)
{
void
taosVisitIntHashWithFp
(
void
*
handle
,
int
(
*
fp
)(
char
*
,
void
*
),
void
*
param
)
{
IHashObj
*
pObj
;
IHashNode
*
pNode
,
*
pNext
;
char
*
pData
=
NULL
;
...
...
@@ -234,21 +233,13 @@ char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) {
pNode
=
pObj
->
hashList
[
i
];
while
(
pNode
)
{
pNext
=
pNode
->
next
;
int
flag
=
fp
(
pNode
->
data
);
if
(
flag
)
{
pData
=
pNode
->
data
;
goto
VisitEnd
;
}
(
*
fp
)(
pNode
->
data
,
param
);
pNode
=
pNext
;
}
}
}
VisitEnd:
pthread_mutex_unlock
(
&
pObj
->
mutex
);
return
pData
;
}
int32_t
taosGetIntHashSize
(
void
*
handle
)
{
...
...
src/util/src/tstatus.c
浏览文件 @
053280da
...
...
@@ -18,13 +18,13 @@
char
*
taosGetVgroupStatusStr
(
int32_t
vgroupStatus
)
{
switch
(
vgroupStatus
)
{
case
TSDB_VG_STATUS_READY
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_IN_PROGRESS
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_NO_DISK_PERMISSIONS
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERVER_NO_PACE
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERV_OUT_OF_MEMORY
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_INIT_FAILED
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_FULL
:
return
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_READY
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_IN_PROGRESS
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_NO_DISK_PERMISSIONS
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERVER_NO_PACE
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERV_OUT_OF_MEMORY
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_INIT_FAILED
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_FULL
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
default:
return
"undefined"
;
}
}
...
...
src/vnode/tsdb/inc/tsdb.h
浏览文件 @
053280da
...
...
@@ -35,7 +35,6 @@ extern "C" {
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef
struct
{
int8_t
precision
;
int32_t
vgId
;
int32_t
tsdbId
;
int32_t
maxTables
;
// maximum number of tables this repository can have
int32_t
daysPerFile
;
// day per file sharding policy
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录