Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
7a96a2f3
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7a96a2f3
编写于
3月 19, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-9] change dnode privateip to dnode id
上级
72d7f581
变更
18
显示空白变更内容
内联
并排
Showing
18 changed file
with
215 addition
and
569 deletion
+215
-569
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+2
-0
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+3
-7
src/inc/mnode.h
src/inc/mnode.h
+22
-33
src/inc/taosmsg.h
src/inc/taosmsg.h
+4
-2
src/mnode/inc/mgmtBalance.h
src/mnode/inc/mgmtBalance.h
+1
-0
src/mnode/inc/mgmtDServer.h
src/mnode/inc/mgmtDServer.h
+0
-15
src/mnode/inc/mgmtDnode.h
src/mnode/inc/mgmtDnode.h
+2
-1
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+3
-3
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+17
-21
src/mnode/src/mgmtChildTable.c
src/mnode/src/mgmtChildTable.c
+23
-22
src/mnode/src/mgmtDClient.c
src/mnode/src/mgmtDClient.c
+0
-87
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+0
-215
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+2
-2
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+35
-121
src/mnode/src/mgmtNormalTable.c
src/mnode/src/mgmtNormalTable.c
+1
-1
src/mnode/src/mgmtSuperTable.c
src/mnode/src/mgmtSuperTable.c
+0
-2
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+50
-12
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+50
-25
未找到文件。
src/dnode/src/dnodeMClient.c
浏览文件 @
7a96a2f3
...
...
@@ -21,6 +21,8 @@
#include "tutil.h"
#include "dnode.h"
#include "dnodeMClient.h"
#include "dnodeModule.h"
#include "dnodeMClient.h"
static
bool
dnodeReadMnodeIpList
();
static
void
dnodeSaveMnodeIpList
();
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
7a96a2f3
...
...
@@ -497,8 +497,8 @@ static void dnodeReadDnodeId() {
int32_t
num
=
0
;
fscanf
(
fp
,
"%s %d"
,
option
,
&
value
);
if
(
num
!=
2
)
return
false
;
if
(
strcmp
(
option
,
"dnodeId"
)
!=
0
)
return
false
;
if
(
num
!=
2
)
return
;
if
(
strcmp
(
option
,
"dnodeId"
)
!=
0
)
return
;
tsDnodeId
=
value
;;
fclose
(
fp
);
...
...
@@ -510,16 +510,12 @@ static void dnodeSaveDnodeId() {
sprintf
(
dnodeIdFile
,
"%s/dnodeId"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
dnodeIdFile
,
"w"
);
if
(
!
fp
)
{
return
false
;
}
if
(
!
fp
)
return
;
fprintf
(
fp
,
"dnodeId %d
\n
"
,
tsDnodeId
);
fclose
(
fp
);
dPrint
(
"save dnodeId successed"
);
return
true
;
}
void
dnodeUpdateDnodeId
(
int32_t
dnodeId
)
{
...
...
src/inc/mnode.h
浏览文件 @
7a96a2f3
...
...
@@ -42,46 +42,38 @@ extern "C" {
typedef
struct
{
int32_t
dnodeId
;
uint32_t
privateIp
;
int32_t
sid
;
uint32_t
publicIp
;
uint32_t
moduleStatus
;
int32_t
openVnodes
;
int32_t
numOfVnodes
;
int32_t
numOfFreeVnodes
;
int64_t
createdTime
;
uint32_t
publicIp
;
int32_t
status
;
uint32_t
lastAccess
;
uint32_t
rebootTimes
;
uint32_t
lastReboot
;
// time stamp for last reboot
uint16_t
numOfCores
;
// from dnode status msg
uint8_t
alternativeRole
;
// from dnode status msg, 0-any, 1-mgmt, 2-dnode
uint8_t
reserveStatus
;
uint16_t
numOfTotalVnodes
;
// from dnode status msg, config information
uint16_t
unused
;
float
diskAvailable
;
// from dnode status msg
int32_t
bandwidthMb
;
// config by user
int16_t
cpuAvgUsage
;
// calc from sys.cpu
int16_t
memoryAvgUsage
;
// calc from sys.mem
int16_t
diskAvgUsage
;
// calc from sys.disk
int16_t
bandwidthUsage
;
// calc from sys.band
int32_t
openVnodes
;
int32_t
numOfTotalVnodes
;
// from dnode status msg, config information
uint32_t
rack
;
uint16_t
idc
;
uint16_t
slot
;
int32_t
customScore
;
// config by user
uint16_t
numOfCores
;
// from dnode status msg
int8_t
alternativeRole
;
// from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t
lbStatus
;
// set in balance function
float
lbScore
;
// calc in balance function
int16_t
lbStatus
;
// set in balance function
int16_t
lastAllocVnode
;
// increase while create vnode
SVnodeLoad
vload
[
TSDB_MAX_VNODES
];
char
reserved
[
16
];
int32_t
customScore
;
// config by user
char
dnodeName
[
TSDB_DNODE_NAME_LEN
+
1
];
char
reserved
[
7
];
char
updateEnd
[
1
];
void
*
thandle
;
SVnodeLoad
vload
[
TSDB_MAX_VNODES
];
int32_t
status
;
uint32_t
lastReboot
;
// time stamp for last reboot
float
diskAvailable
;
// from dnode status msg
int16_t
diskAvgUsage
;
// calc from sys.disk
int16_t
cpuAvgUsage
;
// calc from sys.cpu
int16_t
memoryAvgUsage
;
// calc from sys.mem
int16_t
bandwidthUsage
;
// calc from sys.band
}
SDnodeObj
;
typedef
struct
{
int32_t
dnodeId
;
uint32_t
ip
;
uint32_t
publicIp
;
int32_t
vnode
;
uint32_t
privateIp
;
uint32_t
publicIp
;
}
SVnodeGid
;
typedef
struct
{
...
...
@@ -150,15 +142,13 @@ typedef struct _vg_obj {
uint32_t
vgId
;
char
dbName
[
TSDB_DB_NAME_LEN
+
1
];
int64_t
createdTime
;
uint64_t
lastCreate
;
uint64_t
lastRemove
;
int32_t
numOfVnodes
;
SVnodeGid
vnodeGid
[
TSDB_VNODES_SUPPORT
];
int32_t
numOfVnodes
;
int32_t
numOfTables
;
int32_t
lbIp
;
int32_t
lbTime
;
int8_t
lbStatus
;
int8_t
reserved
[
1
6
];
int8_t
reserved
[
1
4
];
int8_t
updateEnd
[
1
];
struct
_vg_obj
*
prev
,
*
next
;
void
*
idPool
;
...
...
@@ -170,8 +160,7 @@ typedef struct _db_obj {
int8_t
dirty
;
int64_t
createdTime
;
SDbCfg
cfg
;
int8_t
dropStatus
;
char
reserved
[
16
];
char
reserved
[
15
];
char
updateEnd
[
1
];
struct
_db_obj
*
prev
,
*
next
;
int32_t
numOfVgroups
;
...
...
src/inc/taosmsg.h
浏览文件 @
7a96a2f3
...
...
@@ -520,18 +520,20 @@ typedef struct {
typedef
struct
{
int32_t
vgId
;
int32_t
vnode
;
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
uint8_t
status
;
uint8_t
syncStatus
;
uint8_t
accessState
;
uint8_t
reserved
[
6
];
uint8_t
reserved
[
5
];
}
SVnodeLoad
;
typedef
struct
{
uint32_t
vnode
;
char
accessState
;
uint8_t
accessState
;
uint8_t
reserved
[
3
];
}
SVnodeAccess
;
/*
...
...
src/mnode/inc/mgmtBalance.h
浏览文件 @
7a96a2f3
...
...
@@ -23,6 +23,7 @@ extern "C" {
int32_t
mgmtInitBalance
();
void
mgmtCleanupBalance
();
void
mgmtStartBalance
(
int32_t
afterMs
)
;
int32_t
mgmtAllocVnodes
(
SVgObj
*
pVgroup
);
#ifdef __cplusplus
...
...
src/mnode/inc/mgmtDServer.h
浏览文件 @
7a96a2f3
...
...
@@ -24,21 +24,6 @@ int32_t mgmtInitDServer();
void
mgmtCleanupDServer
();
void
mgmtAddDServerMsgHandle
(
uint8_t
msgType
,
void
(
*
fp
)(
SRpcMsg
*
rpcMsg
));
//extern void *mgmtStatusTimer;
//
//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
//
//int32_t mgmtInitDnodeInt();
//void mgmtCleanUpDnodeInt();
//
//void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
//void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtDnode.h
浏览文件 @
7a96a2f3
...
...
@@ -25,7 +25,8 @@ int32_t mgmtInitDnodes();
void
mgmtCleanUpDnodes
();
int32_t
mgmtGetDnodesNum
();
int32_t
mgmtUpdateDnode
(
SDnodeObj
*
pDnode
);
SDnodeObj
*
mgmtGetDnode
(
uint32_t
ip
);
SDnodeObj
*
mgmtGetDnode
(
int32_t
dnodeId
);
SDnodeObj
*
mgmtGetDnodeByIp
(
uint32_t
ip
);
bool
mgmtCheckDnodeInRemoveState
(
SDnodeObj
*
pDnode
);
bool
mgmtCheckDnodeInOfflineState
(
SDnodeObj
*
pDnode
);
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
7a96a2f3
...
...
@@ -24,7 +24,7 @@ void (*mgmtCleanUpAcctsFp)() = NULL;
SAcctObj
*
(
*
mgmtGetAcctFp
)(
char
*
acctName
)
=
NULL
;
int32_t
(
*
mgmtCheckUserLimitFp
)(
SAcctObj
*
pAcct
)
=
NULL
;
int32_t
(
*
mgmtCheckDbLimitFp
)(
SAcctObj
*
pAcct
)
=
NULL
;
int32_t
(
*
mgmtCheckT
imeSeries
LimitFp
)(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
)
=
NULL
;
int32_t
(
*
mgmtCheckT
able
LimitFp
)(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
)
=
NULL
;
int32_t
mgmtAddDbIntoAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
pthread_mutex_lock
(
&
pAcct
->
mutex
);
...
...
@@ -137,8 +137,8 @@ int32_t mgmtCheckDbLimit(SAcctObj *pAcct) {
}
int32_t
mgmtCheckTableLimit
(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
)
{
if
(
mgmtCheckT
imeSeries
LimitFp
)
{
return
(
*
mgmtCheckT
imeSeries
LimitFp
)(
pAcct
,
numOfTimeSeries
);
if
(
mgmtCheckT
able
LimitFp
)
{
return
(
*
mgmtCheckT
able
LimitFp
)(
pAcct
,
numOfTimeSeries
);
}
else
{
return
0
;
}
...
...
src/mnode/src/mgmtBalance.c
浏览文件 @
7a96a2f3
...
...
@@ -20,6 +20,7 @@
int32_t
(
*
mgmtInitBalanceFp
)()
=
NULL
;
void
(
*
mgmtCleanupBalanceFp
)()
=
NULL
;
void
(
*
mgmtStartBalanceFp
)(
int32_t
afterMs
)
=
NULL
;
int32_t
(
*
mgmtAllocVnodesFp
)(
SVgObj
*
pVgroup
)
=
NULL
;
int32_t
mgmtInitBalance
()
{
...
...
@@ -36,33 +37,28 @@ void mgmtCleanupBalance() {
}
}
void
mgmtStartBalance
(
int32_t
afterMs
)
{
if
(
mgmtStartBalanceFp
)
{
(
*
mgmtStartBalanceFp
)(
afterMs
);
}
}
int32_t
mgmtAllocVnodes
(
SVgObj
*
pVgroup
)
{
if
(
mgmtAllocVnodesFp
)
{
return
mgmtAllocVnodesFp
(
pVgroup
);
return
(
*
mgmtAllocVnodesFp
)
(
pVgroup
);
}
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
0
);
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
1
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_OTHERS
;
int32_t
selectedVnode
=
-
1
;
int32_t
lastAllocVode
=
pDnode
->
lastAllocVnode
;
for
(
int32_t
i
=
0
;
i
<
pDnode
->
numOfVnodes
;
i
++
)
{
int32_t
vnode
=
(
i
+
lastAllocVode
)
%
pDnode
->
numOfVnodes
;
if
(
pDnode
->
vload
[
vnode
].
vgId
==
0
&&
pDnode
->
vload
[
vnode
].
status
==
TSDB_VN_STATUS_OFFLINE
)
{
selectedVnode
=
vnode
;
break
;
}
}
if
(
selectedVnode
==
-
1
)
{
mError
(
"alloc vnode failed, free vnodes:%d"
,
pDnode
->
numOfFreeVnodes
);
return
-
1
;
if
(
pDnode
->
openVnodes
<
pDnode
->
numOfTotalVnodes
)
{
pVgroup
->
vnodeGid
[
0
].
dnodeId
=
pDnode
->
dnodeId
;
pVgroup
->
vnodeGid
[
0
].
privateIp
=
pDnode
->
privateIp
;
pVgroup
->
vnodeGid
[
0
].
publicIp
=
pDnode
->
publicIp
;
mTrace
(
"dnode:%d, alloc one vnode to vgroup"
,
pDnode
->
dnodeId
);
return
TSDB_CODE_SUCCESS
;
}
else
{
mTrace
(
"allocate vnode:%d, last allocated vnode:%d"
,
selectedVnode
,
lastAllocVode
);
pVgroup
->
vnodeGid
[
0
].
vnode
=
selectedVnode
;
pDnode
->
lastAllocVnode
=
selectedVnode
+
1
;
if
(
pDnode
->
lastAllocVnode
>=
pDnode
->
numOfVnodes
)
pDnode
->
lastAllocVnode
=
0
;
return
0
;
mError
(
"dnode:%d, failed to alloc vnode to vgroup"
,
pDnode
->
dnodeId
);
return
TSDB_CODE_NO_ENOUGH_DNODES
;
}
}
src/mnode/src/mgmtChildTable.c
浏览文件 @
7a96a2f3
...
...
@@ -77,26 +77,26 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"
id:%s
not in vgroup:%d"
,
pTable
->
tableId
,
pTable
->
vgId
);
mError
(
"
ctable:%s,
not in vgroup:%d"
,
pTable
->
tableId
,
pTable
->
vgId
);
return
NULL
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"
vgroup:%d not in DB:%s"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
mError
(
"
ctable:%s, vgroup:%d not in db:%s"
,
pTable
->
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"
account not exists"
);
mError
(
"
ctable:%s, account:%s not exists"
,
pTable
->
tableId
,
pDb
->
cfg
.
acct
);
return
NULL
;
}
if
(
!
sdbMaster
)
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
!=
pTable
->
sid
)
{
mError
(
"
sid:%d is not matched from the master:%d"
,
sid
,
pTable
->
sid
);
mError
(
"
ctable:%s, sid:%d is not matched from the master:%d"
,
pTable
->
tableId
,
sid
,
pTable
->
sid
);
return
NULL
;
}
}
...
...
@@ -128,13 +128,13 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"
vgroup:%d not in DB:%s"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
mError
(
"
ctable:%s, vgroup:%d not in DB:%s"
,
pTable
->
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"
account not exists"
);
mError
(
"
ctable:%s, account:%s not exists"
,
pTable
->
tableId
,
pDb
->
cfg
.
acct
);
return
NULL
;
}
...
...
@@ -313,7 +313,7 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab
void
*
mgmtCreateChildTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
tid
)
{
int32_t
numOfTables
=
sdbGetNumOfRows
(
tsChildTableSdb
);
if
(
numOfTables
>=
tsMaxTables
)
{
mError
(
"table:%s, numOfTables:%d exceed maxTables:%d"
,
pCreate
->
tableId
,
numOfTables
,
tsMaxTables
);
mError
(
"
c
table:%s, numOfTables:%d exceed maxTables:%d"
,
pCreate
->
tableId
,
numOfTables
,
tsMaxTables
);
terrno
=
TSDB_CODE_TOO_MANY_TABLES
;
return
NULL
;
}
...
...
@@ -321,14 +321,14 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
char
*
pTagData
=
(
char
*
)
pCreate
->
schema
;
// it is a tag key
SSuperTableObj
*
pSuperTable
=
mgmtGetSuperTable
(
pTagData
);
if
(
pSuperTable
==
NULL
)
{
mError
(
"table:%s, corresponding super table does not exist"
,
pCreate
->
tableId
);
mError
(
"
c
table:%s, corresponding super table does not exist"
,
pCreate
->
tableId
);
terrno
=
TSDB_CODE_INVALID_TABLE
;
return
NULL
;
}
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
calloc
(
sizeof
(
SChildTableObj
),
1
);
if
(
pTable
==
NULL
)
{
mError
(
"table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
mError
(
"
c
table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
terrno
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -345,25 +345,25 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
if
(
sdbInsertRow
(
tsChildTableSdb
,
pTable
,
0
)
<
0
)
{
free
(
pTable
);
mError
(
"table:%s, update sdb error"
,
pCreate
->
tableId
);
mError
(
"
c
table:%s, update sdb error"
,
pCreate
->
tableId
);
terrno
=
TSDB_CODE_SDB_ERROR
;
return
NULL
;
}
mTrace
(
"table:%s, create ctable in vgroup, uid:%"
PRIu64
,
pTable
->
tableId
,
pTable
->
uid
);
mTrace
(
"
c
table:%s, create ctable in vgroup, uid:%"
PRIu64
,
pTable
->
tableId
,
pTable
->
uid
);
return
pTable
;
}
int32_t
mgmtDropChildTable
(
SQueuedMsg
*
newMsg
,
SChildTableObj
*
pTable
)
{
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"table:%s, failed to drop child table, vgroup not exist"
,
pTable
->
tableId
);
mError
(
"
c
table:%s, failed to drop child table, vgroup not exist"
,
pTable
->
tableId
);
return
TSDB_CODE_OTHERS
;
}
SMDDropTableMsg
*
pDrop
=
rpcMallocCont
(
sizeof
(
SMDDropTableMsg
));
if
(
pDrop
==
NULL
)
{
mError
(
"table:%s, failed to drop child table, no enough memory"
,
pTable
->
tableId
);
mError
(
"
c
table:%s, failed to drop child table, no enough memory"
,
pTable
->
tableId
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
...
...
@@ -375,7 +375,7 @@ int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) {
SRpcIpSet
ipSet
=
mgmtGetIpSetFromVgroup
(
pVgroup
);
mTrace
(
"table:%s, send drop table msg"
,
pDrop
->
tableId
);
mTrace
(
"
c
table:%s, send drop table msg"
,
pDrop
->
tableId
);
SRpcMsg
rpcMsg
=
{
.
handle
=
newMsg
,
.
pCont
=
pDrop
,
...
...
@@ -395,6 +395,7 @@ void* mgmtGetChildTable(char *tableId) {
}
int32_t
mgmtModifyChildTableTagValueByName
(
SChildTableObj
*
pTable
,
char
*
tagName
,
char
*
nContent
)
{
// TODO: send message to dnode
// int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName);
// if (col < 0 || col > pTable->superTable->numOfTags) {
// return TSDB_CODE_APP_ERROR;
...
...
@@ -462,7 +463,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p
if
(
usePublicIp
)
{
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
publicIp
;
}
else
{
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
i
p
;
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
privateI
p
;
}
pMeta
->
vpeerDesc
[
i
].
vnode
=
htonl
(
pVgroup
->
vnodeGid
[
i
].
vnode
);
pMeta
->
vpeerDesc
[
i
].
vgId
=
htonl
(
pVgroup
->
vgId
);
...
...
src/mnode/src/mgmtDClient.c
浏览文件 @
7a96a2f3
...
...
@@ -83,90 +83,3 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
rpcFreeCont
(
rpcMsg
->
pCont
);
}
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
// mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) {
// mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("alter vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) {
// mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessAlterStreamRsp(SRpcMsg *rpcMsg) {
// mTrace("alter stream rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessConfigDnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("config dnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
//}
//
//void mgmtSendDropVnodeMsg(int32_t vgId, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
// SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pDrop,
// .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
////
////int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
//// char *option, *value;
//// int32_t olen, valen;
////
//// paGetToken(msg, &option, &olen);
//// if (strncasecmp(option, "unremove", 8) == 0) {
//// mgmtSetDnodeUnRemove(pDnode);
//// return TSDB_CODE_SUCCESS;
//// } else if (strncasecmp(option, "score", 5) == 0) {
//// paGetToken(option + olen + 1, &value, &valen);
//// if (valen > 0) {
//// int32_t score = atoi(value);
//// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
//// pDnode->customScore = score;
//// mgmtUpdateDnode(pDnode);
//// //mgmtStartBalanceTimer(15);
//// }
//// return TSDB_CODE_INVALID_SQL;
//// } else if (strncasecmp(option, "bandwidth", 9) == 0) {
//// paGetToken(msg, &value, &valen);
//// if (valen > 0) {
//// int32_t bandwidthMb = atoi(value);
//// if (bandwidthMb >= 0 && bandwidthMb < 10000000) {
//// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb);
//// pDnode->bandwidthMb = bandwidthMb;
//// mgmtUpdateDnode(pDnode);
//// return TSDB_CODE_SUCCESS;
//// }
//// }
//// return TSDB_CODE_INVALID_SQL;
//// }
////
//// return -1;
////}
////
src/mnode/src/mgmtDServer.c
浏览文件 @
7a96a2f3
...
...
@@ -85,218 +85,3 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
static
int
mgmtDServerRetrieveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
return
TSDB_CODE_SUCCESS
;
}
//
//
//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
// pCfg->sid = htonl(pCfg->sid);
// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
//
// if (!sdbMaster) {
// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
// if (pTable == NULL) {
// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// //TODO
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
//}
//
//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
// if (!sdbMaster) {
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
//
// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
// if (pVgroup == NULL) {
// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
//}
//
//static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("create table rsp received, thandle:%p code:%d", thandle, code);
// if (thandle == NULL) return;
//
// SProcessInfo *info = thandle;
// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
// STableInfo *pTable = info->ahandle;
//
// if (code != TSDB_CODE_SUCCESS) {
// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId);
// mgmtSetTableDirty(pTable, true);
// } else {
// mTrace("table:%s, created in dnode", pTable->tableId);
// mgmtSetTableDirty(pTable, false);
// }
//
// if (code != TSDB_CODE_SUCCESS) {
// SRpcMsg rpcMsg = {0};
// rpcMsg.code = code;
// rpcMsg.handle = info->thandle;
// rpcSendResponse(&rpcMsg);
// } else {
// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
// mTrace("table:%s, start to process get meta", pTable->tableId);
// mgmtProcessGetTableMeta(pTable, thandle);
// } else {
// SRpcMsg rpcMsg = {0};
// rpcMsg.code = code;
// rpcMsg.handle = info->thandle;
// rpcSendResponse(&rpcMsg);
// }
// }
//
// free(info);
//}
//
//static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("remove table rsp received, thandle:%p code:%d", thandle, code);
//}
//
//
//static void mgmtProcessDropVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
//}
//
//static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code);
//}
//
//static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code);
// if (thandle == NULL) return;
//
// SProcessInfo *info = thandle;
// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
// info->received++;
// SVgObj *pVgroup = info->ahandle;
//
// bool isGetMeta = false;
// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
// isGetMeta = true;
// }
//
// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
// if (info->received == pVgroup->numOfVnodes) {
// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
// free(info);
// }
//}
//
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
// mError("invalid msg type:%d", msgType);
// return;
// }
//
// mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
//
// if (msgType == TSDB_MSG_TYPE_DM_CONFIG_TABLE) {
// mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
// } else if (msgType == TSDB_MSG_TYPE_DM_CONFIG_VNODE) {
// mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP) {
// mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_TABLE_RSP) {
// mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP) {
// mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_VNODE_RSP) {
// mgmtProcessDropVnodeRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
// mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_DM_STATUS) {
// mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
// } else {
// mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
// }
//
// //rpcFreeCont(pCont);
//}
//
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
//}
//
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
//
// SMDDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SMDDropVnodeMsg));
// if (pFreeVnode != NULL) {
// pFreeVnode->vnode = htonl(vnode);
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_DROP_VNODE, pFreeVnode, sizeof(SMDDropVnodeMsg), ahandle);
// }
//}
//
//int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
// char *option, *value;
// int32_t olen, valen;
//
// paGetToken(msg, &option, &olen);
// if (strncasecmp(option, "unremove", 8) == 0) {
// mgmtSetDnodeUnRemove(pDnode);
// return TSDB_CODE_SUCCESS;
// } else if (strncasecmp(option, "score", 5) == 0) {
// paGetToken(option + olen + 1, &value, &valen);
// if (valen > 0) {
// int32_t score = atoi(value);
// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
// pDnode->customScore = score;
// mgmtUpdateDnode(pDnode);
// //mgmtStartBalanceTimer(15);
// }
// return TSDB_CODE_INVALID_SQL;
// } else if (strncasecmp(option, "bandwidth", 9) == 0) {
// paGetToken(msg, &value, &valen);
// if (valen > 0) {
// int32_t bandwidthMb = atoi(value);
// if (bandwidthMb >= 0 && bandwidthMb < 10000000) {
// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb);
// pDnode->bandwidthMb = bandwidthMb;
// mgmtUpdateDnode(pDnode);
// return TSDB_CODE_SUCCESS;
// }
// }
// return TSDB_CODE_INVALID_SQL;
// }
//
// return -1;
//}
//
//
//void mgmtCleanUpDnodeInt() {
// if (mgmtCleanUpDnodeIntFp) {
// mgmtCleanUpDnodeIntFp();
// }
//}
//
\ No newline at end of file
src/mnode/src/mgmtDb.c
浏览文件 @
7a96a2f3
...
...
@@ -679,7 +679,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
pDb
->
d
ropStatus
!=
TSDB_DB_STATUS_READY
?
"dropping"
:
"ready"
);
strcpy
(
pWrite
,
pDb
->
d
irty
!=
TSDB_DB_STATUS_READY
?
"dropping"
:
"ready"
);
cols
++
;
numOfRows
++
;
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
7a96a2f3
...
...
@@ -51,75 +51,22 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
void
mgmtSetDnodeMaxVnodes
(
SDnodeObj
*
pDnode
)
{
int32_t
maxVnodes
=
pDnode
->
numOfCores
*
tsNumOfVnodesPerCore
;
maxVnodes
=
maxVnodes
>
TSDB_MAX_VNODES
?
TSDB_MAX_VNODES
:
maxVnodes
;
maxVnodes
=
maxVnodes
<
TSDB_MIN_VNODES
?
TSDB_MIN_VNODES
:
maxVnodes
;
if
(
pDnode
->
numOfTotalVnodes
!=
0
)
{
maxVnodes
=
pDnode
->
numOfTotalVnodes
;
if
(
pDnode
->
numOfTotalVnodes
==
0
)
{
pDnode
->
numOfTotalVnodes
=
maxVnodes
;
}
if
(
pDnode
->
alternativeRole
==
TSDB_DNODE_ROLE_MGMT
)
{
max
Vnodes
=
0
;
pDnode
->
numOfTotal
Vnodes
=
0
;
}
pDnode
->
numOfVnodes
=
maxVnodes
;
pDnode
->
numOfFreeVnodes
=
maxVnodes
;
pDnode
->
openVnodes
=
0
;
pDnode
->
status
=
TSDB_DN_STATUS_OFFLINE
;
}
void
mgmtCalcNumOfFreeVnodes
(
SDnodeObj
*
pDnode
)
{
int32_t
totalVnodes
=
0
;
mTrace
(
"dnode:%s, begin calc free vnodes"
,
taosIpStr
(
pDnode
->
privateIp
));
for
(
int32_t
i
=
0
;
i
<
pDnode
->
numOfVnodes
;
++
i
)
{
SVnodeLoad
*
pVload
=
pDnode
->
vload
+
i
;
if
(
pVload
->
vgId
!=
0
)
{
mTrace
(
"dnode:%d, calc free vnodes, vnode:%d, status:%d %s, syncstatus:%d %s"
,
pDnode
->
dnodeId
,
pVload
->
vgId
,
pVload
->
status
,
taosGetVnodeStatusStr
(
pVload
->
status
),
pVload
->
syncStatus
,
taosGetVnodeSyncStatusStr
(
pVload
->
syncStatus
));
totalVnodes
++
;
}
}
pDnode
->
numOfFreeVnodes
=
pDnode
->
numOfVnodes
-
totalVnodes
;
mTrace
(
"dnode:%s, numOfVnodes:%d, numOfFreeVnodes:%d, totalVnodes:%d"
,
taosIpStr
(
pDnode
->
privateIp
),
pDnode
->
numOfVnodes
,
pDnode
->
numOfFreeVnodes
,
totalVnodes
);
}
void
mgmtSetDnodeVgid
(
SVnodeGid
vnodeGid
[],
int32_t
numOfVnodes
,
int32_t
vgId
)
{
SDnodeObj
*
pDnode
;
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
pDnode
=
mgmtGetDnode
(
vnodeGid
[
i
].
ip
);
if
(
pDnode
)
{
SVnodeLoad
*
pVload
=
pDnode
->
vload
+
vnodeGid
[
i
].
vnode
;
memset
(
pVload
,
0
,
sizeof
(
SVnodeLoad
));
//pVload->vnode = vnodeGid[i].vnode;
pVload
->
vgId
=
vgId
;
mTrace
(
"dnode:%s, vnode:%d add to vgroup:%d"
,
taosIpStr
(
pDnode
->
privateIp
),
vnodeGid
[
i
].
vnode
,
pVload
->
vgId
);
mgmtCalcNumOfFreeVnodes
(
pDnode
);
}
else
{
mError
(
"dnode:%s, not in dnode DB!!!"
,
taosIpStr
(
vnodeGid
[
i
].
ip
));
}
}
}
void
mgmtUnSetDnodeVgid
(
SVnodeGid
vnodeGid
[],
int32_t
numOfVnodes
)
{
SDnodeObj
*
pDnode
;
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
pDnode
=
mgmtGetDnode
(
vnodeGid
[
i
].
ip
);
if
(
pDnode
)
{
SVnodeLoad
*
pVload
=
pDnode
->
vload
+
vnodeGid
[
i
].
vnode
;
mTrace
(
"dnode:%s, vnode:%d remove from vgroup:%d"
,
taosIpStr
(
vnodeGid
[
i
].
ip
),
vnodeGid
[
i
].
vnode
,
pVload
->
vgId
);
memset
(
pVload
,
0
,
sizeof
(
SVnodeLoad
));
mgmtCalcNumOfFreeVnodes
(
pDnode
);
}
else
{
mError
(
"dnode:%s not in dnode DB!!!"
,
taosIpStr
(
vnodeGid
[
i
].
ip
));
}
}
}
bool
mgmtCheckModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
uint32_t
status
=
pDnode
->
moduleStatus
&
(
1
<<
moduleType
);
return
status
>
0
;
...
...
@@ -338,11 +285,10 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn)
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
// TODO: if other thread drop dnode ????
SDnodeObj
*
pDnode
=
NULL
;
if
(
pShow
->
payloadLen
>
0
)
{
uint32_t
ip
=
ip2uint
(
pShow
->
payload
);
pDnode
=
mgmtGetDnode
(
ip
);
pDnode
=
mgmtGetDnode
ByIp
(
ip
);
if
(
NULL
==
pDnode
)
{
return
TSDB_CODE_NODE_OFFLINE
;
}
...
...
@@ -434,15 +380,14 @@ int32_t mgmtInitDnodes() {
return
mgmtInitDnodesFp
();
}
else
{
tsDnodeObj
.
dnodeId
=
1
;
tsDnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);;
tsDnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);
tsDnodeObj
.
publicIp
=
inet_addr
(
tsPublicIp
);
tsDnodeObj
.
createdTime
=
taosGetTimestampMs
();
tsDnodeObj
.
lastReboot
=
taosGetTimestampSec
()
;
tsDnodeObj
.
numOfTotalVnodes
=
tsNumOfTotalVnodes
;
tsDnodeObj
.
numOfCores
=
(
uint16_t
)
tsNumOfCores
;
tsDnodeObj
.
status
=
TSDB_DN_STATUS_READY
;
tsDnodeObj
.
alternativeRole
=
TSDB_DNODE_ROLE_ANY
;
tsDnodeObj
.
numOfTotalVnodes
=
tsNumOfTotalVnodes
;
tsDnodeObj
.
thandle
=
(
void
*
)
(
1
);
//hack way
tsDnodeObj
.
status
=
TSDB_DN_STATUS_READY
;
tsDnodeObj
.
status
=
TSDB_DN_STATUS_OFFLINE
;
tsDnodeObj
.
lastReboot
=
taosGetTimestampSec
();
mgmtSetDnodeMaxVnodes
(
&
tsDnodeObj
);
tsDnodeObj
.
moduleStatus
|=
(
1
<<
TSDB_MOD_MGMT
);
...
...
@@ -458,31 +403,30 @@ int32_t mgmtInitDnodes() {
void
mgmtCleanUpDnodes
()
{
if
(
mgmtCleanUpDnodesFp
)
{
mgmtCleanUpDnodesFp
();
(
*
mgmtCleanUpDnodesFp
)
();
}
}
SDnodeObj
*
mgmtGetDnode
(
uint32_t
ip
)
{
SDnodeObj
*
mgmtGetDnode
(
int32_t
dnodeId
)
{
if
(
mgmtGetDnodeFp
)
{
return
mgmtGetDnodeFp
(
ip
);
}
else
{
return
(
*
mgmtGetDnodeFp
)(
dnodeId
);
}
if
(
dnodeId
==
1
)
{
return
&
tsDnodeObj
;
}
return
NULL
;
}
SDnodeObj
*
mgmtGetDnodeByIp
(
int32_t
dnodeId
)
{
SDnodeObj
*
mgmtGetDnodeByIp
(
uint32_t
ip
)
{
if
(
mgmtGetDnodeByIpFp
)
{
return
mgmtGetDnodeByIpFp
(
dnodeId
);
return
(
*
mgmtGetDnodeByIpFp
)(
ip
);
}
if
(
dnodeId
!=
0
)
{
return
&
tsDnodeObj
;
}
return
NULL
;
}
int32_t
mgmtGetDnodesNum
()
{
if
(
mgmtGetDnodesNumFp
)
{
return
mgmtGetDnodesNumFp
();
return
(
*
mgmtGetDnodesNumFp
)
();
}
else
{
return
1
;
}
...
...
@@ -490,7 +434,7 @@ int32_t mgmtGetDnodesNum() {
int32_t
mgmtUpdateDnode
(
SDnodeObj
*
pDnode
)
{
if
(
mgmtUpdateDnodeFp
)
{
return
mgmtUpdateDnodeFp
(
pDnode
);
return
(
*
mgmtUpdateDnodeFp
)
(
pDnode
);
}
else
{
return
0
;
}
...
...
@@ -498,7 +442,7 @@ int32_t mgmtUpdateDnode(SDnodeObj *pDnode) {
void
*
mgmtGetNextDnode
(
SShowObj
*
pShow
,
SDnodeObj
**
pDnode
)
{
if
(
mgmtGetNextDnodeFp
)
{
return
mgmtGetNextDnodeFp
(
pShow
,
pDnode
);
return
(
*
mgmtGetNextDnodeFp
)
(
pShow
,
pDnode
);
}
else
{
if
(
*
pDnode
==
NULL
)
{
*
pDnode
=
&
tsDnodeObj
;
...
...
@@ -512,14 +456,12 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
void
mgmtSetDnodeUnRemove
(
SDnodeObj
*
pDnode
)
{
if
(
mgmtSetDnodeUnRemoveFp
)
{
mgmtSetDnodeUnRemoveFp
(
pDnode
);
(
*
mgmtSetDnodeUnRemoveFp
)
(
pDnode
);
}
}
bool
mgmtCheckConfigShow
(
SGlobalConfig
*
cfg
)
{
if
(
cfg
->
cfgType
&
TSDB_CFG_CTYPE_B_CLUSTER
)
return
false
;
if
(
cfg
->
cfgType
&
TSDB_CFG_CTYPE_B_NOT_PRINT
)
if
(
!
(
cfg
->
cfgType
&
TSDB_CFG_CTYPE_B_SHOW
))
return
false
;
return
true
;
}
...
...
@@ -583,7 +525,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
mgmtGetDnodeByIp
(
pStatus
->
privateIp
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode not created
in cluster
, privateIp:%s, name:%s, "
,
taosIpStr
(
htonl
(
pStatus
->
dnodeId
)),
pStatus
->
dnodeName
);
mTrace
(
"dnode not created, privateIp:%s, name:%s, "
,
taosIpStr
(
htonl
(
pStatus
->
dnodeId
)),
pStatus
->
dnodeName
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
return
;
}
...
...
@@ -603,7 +545,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode
->
publicIp
=
htonl
(
pStatus
->
publicIp
);
pDnode
->
lastReboot
=
htonl
(
pStatus
->
lastReboot
);
pDnode
->
numOfTotalVnodes
=
htons
(
pStatus
->
numOfTotalVnodes
);
pDnode
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
pDnode
->
numOfCores
=
htons
(
pStatus
->
numOfCores
);
pDnode
->
diskAvailable
=
pStatus
->
diskAvailable
;
pDnode
->
alternativeRole
=
pStatus
->
alternativeRole
;
...
...
@@ -619,48 +560,21 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
//mgmtUpdateMnodeIp();
}
for
(
int32_t
j
=
0
;
j
<
pDnode
->
openVnodes
;
++
j
)
{
pStatus
->
load
[
j
].
vgId
=
htonl
(
pStatus
->
load
[
j
].
vgId
);
pStatus
->
load
[
j
].
totalStorage
=
htobe64
(
pStatus
->
load
[
j
].
totalStorage
);
pStatus
->
load
[
j
].
compStorage
=
htobe64
(
pStatus
->
load
[
j
].
compStorage
);
pStatus
->
load
[
j
].
pointsWritten
=
htobe64
(
pStatus
->
load
[
j
].
pointsWritten
);
bool
existInMnode
=
false
;
for
(
int32_t
vnode
=
0
;
vnode
<
pDnode
->
numOfVnodes
;
++
vnode
)
{
SVnodeLoad
*
pVload
=
&
(
pDnode
->
vload
[
vnode
]);
if
(
pVload
->
vgId
==
pStatus
->
load
[
j
].
vgId
)
{
existInMnode
=
true
;
}
}
int32_t
openVnodes
=
htons
(
pStatus
->
openVnodes
);
for
(
int32_t
j
=
0
;
j
<
openVnodes
;
++
j
)
{
pDnode
->
vload
[
j
].
vgId
=
htonl
(
pStatus
->
load
[
j
].
vgId
);
pDnode
->
vload
[
j
].
totalStorage
=
htobe64
(
pStatus
->
load
[
j
].
totalStorage
);
pDnode
->
vload
[
j
].
compStorage
=
htobe64
(
pStatus
->
load
[
j
].
compStorage
);
pDnode
->
vload
[
j
].
pointsWritten
=
htobe64
(
pStatus
->
load
[
j
].
pointsWritten
);
if
(
!
existInMnode
)
{
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pStatus
->
load
[
j
].
vgId
);
if
(
pVgroup
==
NULL
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
privateIp
);
mPrint
(
"dnode:%d, vnode:%d not exist in mnode, drop it"
,
pDnode
->
dnodeId
,
pStatus
->
load
[
j
].
vgId
);
mgmtSendDropVnodeMsg
(
pStatus
->
load
[
j
].
vgId
,
&
ipSet
,
NULL
);
}
}
for
(
int32_t
vnode
=
0
;
vnode
<
pDnode
->
numOfVnodes
;
++
vnode
)
{
SVnodeLoad
*
pVload
=
&
(
pDnode
->
vload
[
vnode
]);
bool
existInDnode
=
false
;
for
(
int32_t
j
=
0
;
j
<
pDnode
->
openVnodes
;
++
j
)
{
if
(
htonl
(
pStatus
->
load
[
j
].
vgId
)
==
pVload
->
vgId
)
{
existInDnode
=
true
;
break
;
}
}
if
(
!
existInDnode
)
{
mPrint
(
"dnode:%d, vnode:%d not exist in dnode, create it"
,
pDnode
->
dnodeId
,
pVload
->
vgId
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pVload
->
vgId
);
if
(
pVgroup
!=
NULL
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
privateIp
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
NULL
);
}
}
}
if
(
pDnode
->
status
!=
TSDB_DN_STATUS_READY
)
{
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TSDB_DN_STATUS_READY
;
...
...
src/mnode/src/mgmtNormalTable.c
浏览文件 @
7a96a2f3
...
...
@@ -540,7 +540,7 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta
if
(
usePublicIp
)
{
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
publicIp
;
}
else
{
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
i
p
;
pMeta
->
vpeerDesc
[
i
].
ip
=
pVgroup
->
vnodeGid
[
i
].
privateI
p
;
}
pMeta
->
vpeerDesc
[
i
].
vnode
=
htonl
(
pVgroup
->
vnodeGid
[
i
].
vnode
);
pMeta
->
vpeerDesc
[
i
].
vgId
=
htonl
(
pVgroup
->
vgId
);
...
...
src/mnode/src/mgmtSuperTable.c
浏览文件 @
7a96a2f3
...
...
@@ -65,8 +65,6 @@ static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtSuperTableActionDecode
;
mgmtSuperTableActionFp
[
SDB_TYPE_RESET
]
=
mgmtSuperTableActionReset
;
mgmtSuperTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtSuperTableActionDestroy
;
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_METRIC
,
mgmtGetShowSuperTableMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_METRIC
,
mgmtRetrieveShowSuperTables
);
}
void
*
mgmtSuperTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
7a96a2f3
...
...
@@ -52,6 +52,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg);
static
void
mgmtProcessSuperTableMetaMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessCreateTableRsp
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessDropTableRsp
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessAlterTableRsp
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessDropStableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mgmtGetShowTableMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveShowTables
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessGetTableMeta
(
STableInfo
*
pTable
,
void
*
thandle
);
...
...
@@ -84,8 +86,8 @@ int32_t mgmtInitTables() {
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_TABLE
,
mgmtRetrieveShowTables
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP
,
mgmtProcessCreateTableRsp
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_DROP_TABLE_RSP
,
mgmtProcessDropTableRsp
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP
,
NULL
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_DROP_STABLE_RSP
,
NULL
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP
,
mgmtProcessAlterTableRsp
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_DROP_STABLE_RSP
,
mgmtProcessDropStableRsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -109,17 +111,15 @@ STableInfo* mgmtGetTable(char *tableId) {
return
NULL
;
}
STableInfo
*
mgmtGetTableByPos
(
uint32_t
dnodeIp
,
int32_t
vnode
,
int32_t
sid
)
{
SDnodeObj
*
pObj
=
mgmtGetDnode
(
dnodeIp
);
if
(
pObj
!=
NULL
&&
vnode
>=
0
&&
vnode
<
pObj
->
numOfVnodes
)
{
int32_t
vgId
=
pObj
->
vload
[
vnode
].
vgId
;
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
vgId
);
if
(
pVgroup
)
{
return
pVgroup
->
tableList
[
sid
];
}
}
STableInfo
*
mgmtGetTableByPos
(
uint32_t
dnodeId
,
int32_t
vnode
,
int32_t
sid
)
{
SDnodeObj
*
pObj
=
mgmtGetDnode
(
dnodeId
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
vnode
);
if
(
pObj
==
NULL
||
pVgroup
==
NULL
)
{
return
NULL
;
}
return
pVgroup
->
tableList
[
sid
];
}
int32_t
mgmtGetTableMeta
(
SDbObj
*
pDb
,
STableInfo
*
pTable
,
STableMeta
*
pMeta
,
bool
usePublicIp
)
{
...
...
@@ -587,7 +587,7 @@ void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
void
mgmtProcessGetTableMeta
(
STableInfo
*
pTable
,
void
*
thandle
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pTable
->
tableId
);
if
(
pDb
==
NULL
||
pDb
->
d
ropStatus
!=
TSDB_DB_STATUS_READY
)
{
if
(
pDb
==
NULL
||
pDb
->
d
irty
)
{
mError
(
"table:%s, failed to get table meta, db not selected"
,
pTable
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_DB_NOT_SELECTED
;
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -783,6 +783,10 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
free
(
queueMsg
);
}
static
void
mgmtProcessAlterTableRsp
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"alter table rsp received, handle:%p code:%d"
,
rpcMsg
->
handle
,
rpcMsg
->
code
);
}
static
void
mgmtProcessDropTableRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
...
...
@@ -831,3 +835,37 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
TSDB_CODE_SUCCESS
);
free
(
queueMsg
);
}
static
void
mgmtProcessDropStableRsp
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"drop stable rsp received, handle:%p code:%d"
,
rpcMsg
->
handle
,
rpcMsg
->
code
);
}
//
//
//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
// pCfg->sid = htonl(pCfg->sid);
// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
//
// if (!sdbMaster) {
// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
// if (pTable == NULL) {
// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// //TODO
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
//}
//
\ No newline at end of file
src/mnode/src/mgmtVgroup.c
浏览文件 @
7a96a2f3
...
...
@@ -111,11 +111,11 @@ int32_t mgmtInitVgroups() {
if
(
tsIsCluster
&&
pVgroup
->
vnodeGid
[
0
].
publicIp
==
0
)
{
pVgroup
->
vnodeGid
[
0
].
publicIp
=
inet_addr
(
tsPublicIp
);
pVgroup
->
vnodeGid
[
0
].
i
p
=
inet_addr
(
tsPrivateIp
);
pVgroup
->
vnodeGid
[
0
].
privateI
p
=
inet_addr
(
tsPrivateIp
);
sdbUpdateRow
(
tsVgroupSdb
,
pVgroup
,
tsVgUpdateSize
,
1
);
}
mgmtSetDnodeVgid
(
pVgroup
->
vnodeGid
,
pVgroup
->
numOfVnodes
,
pVgroup
->
vgId
);
//
mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
}
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VGROUP
,
mgmtGetVgroupMeta
);
...
...
@@ -131,9 +131,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
return
(
SVgObj
*
)
sdbGetRow
(
tsVgroupSdb
,
&
vgId
);
}
/*
* TODO: check if there is enough sids
*/
SVgObj
*
mgmtGetAvailableVgroup
(
SDbObj
*
pDb
)
{
return
pDb
->
pHead
;
}
...
...
@@ -162,13 +159,13 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
pVgroup
->
idPool
=
taosInitIdPool
(
pDb
->
cfg
.
maxSessions
);
mgmtAddVgroupIntoDb
(
pDb
,
pVgroup
);
mgmtSetDnodeVgid
(
pVgroup
->
vnodeGid
,
pVgroup
->
numOfVnodes
,
pVgroup
->
vgId
);
//
mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
sdbInsertRow
(
tsVgroupSdb
,
pVgroup
,
0
);
mPrint
(
"vgroup:%d, is created in mnode, db:%s replica:%d"
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
mPrint
(
"vgroup:%d, dnode:%
s vnode:%d"
,
pVgroup
->
vgId
,
taosIpStr
(
pVgroup
->
vnodeGid
[
i
].
ip
),
pVgroup
->
vnodeGid
[
i
].
vnode
);
mPrint
(
"vgroup:%d, dnode:%
d vnode:%d"
,
pVgroup
->
vgId
,
taosIpStr
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
),
pVgroup
->
vnodeGid
[
i
].
vnode
);
}
pMsg
->
ahandle
=
pVgroup
;
...
...
@@ -305,9 +302,9 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
}
char
*
mgmtGetVnodeStatus
(
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pVnode
->
ip
);
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pVnode
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"
dnode:%s, vgroup:%d, vnode:%d dnode not exist"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
pVnode
->
vnode
);
mError
(
"
vgroup:%d, not exist in dnode:%d"
,
pVgroup
->
vgId
,
pDnode
->
dnodeId
);
return
"null"
;
}
...
...
@@ -315,14 +312,13 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
return
"offline"
;
}
SVnodeLoad
*
vload
=
pDnode
->
vload
+
pVnode
->
vnode
;
if
(
vload
->
vgId
!=
pVgroup
->
vgId
)
{
mError
(
"dnode:%s, vgroup:%d, not same with dnode vgroup:%d"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
vload
->
vgId
);
return
"null"
;
for
(
int
i
=
0
;
i
<
pDnode
->
openVnodes
;
++
i
)
{
if
(
pDnode
->
vload
[
i
].
vgId
==
pVgroup
->
vgId
)
{
return
(
char
*
)
taosGetVnodeStatusStr
(
pDnode
->
vload
[
i
].
status
);
}
}
return
(
char
*
)
taosGetVnodeStatusStr
(
vload
->
status
)
;
return
"null"
;
}
int32_t
mgmtRetrieveVgroups
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
...
...
@@ -362,7 +358,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols
++
;
for
(
int32_t
i
=
0
;
i
<
maxReplica
;
++
i
)
{
tinet_ntoa
(
ipstr
,
pVgroup
->
vnodeGid
[
i
].
i
p
);
tinet_ntoa
(
ipstr
,
pVgroup
->
vnodeGid
[
i
].
privateI
p
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
cols
++
;
...
...
@@ -372,7 +368,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
if
(
pVgroup
->
vnodeGid
[
i
].
ip
!=
0
)
{
if
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
!=
0
)
{
char
*
vnodeStatus
=
mgmtGetVnodeStatus
(
pVgroup
,
pVgroup
->
vnodeGid
+
i
);
strcpy
(
pWrite
,
vnodeStatus
);
}
else
{
...
...
@@ -394,6 +390,11 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
static
void
*
mgmtVgroupActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
row
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
pVgroup
->
vnodeGid
[
i
].
vnode
=
pVgroup
->
vgId
;
}
return
NULL
;
}
...
...
@@ -405,7 +406,7 @@ static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t
mgmtRemoveVgroupFromDb
(
pDb
,
pVgroup
);
}
mgmtUnSetDnodeVgid
(
pVgroup
->
vnodeGid
,
pVgroup
->
numOfVnodes
);
//
mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
tfree
(
pVgroup
->
tableList
);
return
NULL
;
...
...
@@ -515,7 +516,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SVnodeDesc
*
vpeerDesc
=
pVnode
->
vpeerDesc
;
for
(
int32_t
j
=
0
;
j
<
pVgroup
->
numOfVnodes
;
++
j
)
{
vpeerDesc
[
j
].
vgId
=
htonl
(
pVgroup
->
vgId
);
vpeerDesc
[
j
].
ip
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
i
p
);
vpeerDesc
[
j
].
ip
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
privateI
p
);
}
return
pVnode
;
...
...
@@ -542,7 +543,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
.
port
=
tsDnodeMnodePort
};
for
(
int
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
ipSet
.
ip
[
i
]
=
pVgroup
->
vnodeGid
[
i
].
i
p
;
ipSet
.
ip
[
i
]
=
pVgroup
->
vnodeGid
[
i
].
privateI
p
;
}
return
ipSet
;
}
...
...
@@ -573,7 +574,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send create all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
i
p
);
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
privateI
p
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
ahandle
);
}
}
...
...
@@ -637,7 +638,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
static
void
mgmtSendDropVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
)
{
mTrace
(
"vgroup:%d, send drop all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
i
p
);
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
privateI
p
);
mgmtSendDropVnodeMsg
(
pVgroup
->
vgId
,
&
ipSet
,
ahandle
);
}
}
...
...
@@ -686,12 +687,36 @@ void mgmtUpdateVgroupIp(SDnodeObj *pDnode) {
SVnodeGid
*
vnodeGid
=
pVgroup
->
vnodeGid
+
i
;
if
(
vnodeGid
->
dnodeId
==
pDnode
->
dnodeId
)
{
mPrint
(
"vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s"
,
pVgroup
->
vgId
,
vnodeGid
->
dnodeId
,
pDnode
->
privateIp
,
taosIpStr
(
vnodeGid
->
i
p
),
pVgroup
->
vgId
,
vnodeGid
->
dnodeId
,
pDnode
->
privateIp
,
taosIpStr
(
vnodeGid
->
privateI
p
),
pDnode
->
publicIp
,
taosIpStr
(
vnodeGid
->
publicIp
));
vnodeGid
->
publicIp
=
pDnode
->
publicIp
;
vnodeGid
->
i
p
=
pDnode
->
privateIp
;
vnodeGid
->
privateI
p
=
pDnode
->
privateIp
;
sdbUpdateRow
(
tsVgroupSdb
,
pVgroup
,
tsVgUpdateSize
,
1
);
}
}
}
}
//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
// if (!sdbMaster) {
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
//
// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
// if (pVgroup == NULL) {
// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
//}
//
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录