Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8d77b0a7
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8d77b0a7
编写于
12月 09, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 vgroup manage
上级
ec389d9b
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
263 addition
and
18 deletion
+263
-18
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+7
-13
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-0
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+2
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+252
-3
未找到文件。
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8d77b0a7
...
...
@@ -223,29 +223,23 @@ typedef struct SDbObj {
typedef
struct
{
int32_t
dnodeId
;
int8_t
role
;
SDnodeObj
*
pDnode
;
ESyncState
role
;
}
SVnodeGid
;
typedef
struct
SVgObj
{
uint32_t
vgId
;
int32_t
numOfVnodes
;
int32_t
vgId
;
int64_t
createdTime
;
int64_t
updateTime
;
int32_t
lbDnodeId
;
int32_t
lbTime
;
int32_t
version
;
char
dbName
[
TSDB_FULL_DB_NAME_LEN
];
int8_t
inUse
;
int8_t
accessState
;
int8_t
status
;
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
int32_t
vgCfgVersion
;
int8_t
compact
;
int32_t
numOfTables
;
int32_t
numOfTimeSeries
;
int64_t
totalStorage
;
int64_t
compStorage
;
int64_t
pointsWritten
;
SDbObj
*
pDb
;
int8_t
compact
;
int8_t
replica
;
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
}
SVgObj
;
typedef
struct
SSTableObj
{
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
8d77b0a7
...
...
@@ -20,6 +20,7 @@
#include "sdb.h"
#include "tcache.h"
#include "tqueue.h"
#include "ttime.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
8d77b0a7
...
...
@@ -24,7 +24,8 @@ extern "C" {
int32_t
mndInitVgroup
(
SMnode
*
pMnode
);
void
mndCleanupVgroup
(
SMnode
*
pMnode
);
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
SVgObj
*
mndAcquireVgroup
(
SMnode
*
pMnode
,
int32_t
vgId
);
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
8d77b0a7
...
...
@@ -40,7 +40,7 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitDb
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_
USER
,
SSdbTable
table
=
{.
sdbType
=
SDB_
DB
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDbActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDbActionDecode
,
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
8d77b0a7
...
...
@@ -15,17 +15,52 @@
#define _DEFAULT_SOURCE
#include "mndVgroup.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "ttime.h"
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64
static
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
);
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
);
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCompactVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetVgroupMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveVgroups
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndGetVnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveVnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextVnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitVgroup
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_VGROUP
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndVgroupActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndVgroupActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndVgroupActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupActionDelete
,
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupActionUpdate
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP
,
mndProcessCreateVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP
,
mndProcessAlterVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_VNODE_IN_RSP
,
mndProcessDropVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP
,
mndProcessSyncVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP
,
mndProcessCompactVnodeRsp
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_VGROUP
,
mndGetVgroupMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_VGROUP
,
mndRetrieveVgroups
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_VGROUP
,
mndCancelGetNextVgroup
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_VNODES
,
mndGetVnodeMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_VNODES
,
mndRetrieveVnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_VNODES
,
mndCancelGetNextVnode
);
...
...
@@ -35,7 +70,221 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void
mndCleanupVgroup
(
SMnode
*
pMnode
)
{}
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
static
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_DB
,
TSDB_VGROUP_VER_NUM
,
sizeof
(
SDbObj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
vgId
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
updateTime
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
version
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_FULL_DB_NAME_LEN
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgroup
->
replica
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgid
->
dnodeId
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgid
->
role
)
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_VGROUP_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
return
pRaw
;
}
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_VGROUP_VER_NUM
)
{
mError
(
"failed to decode vgroup since %s"
,
terrstr
());
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SDbObj
));
SVgObj
*
pVgroup
=
sdbGetRowObj
(
pRow
);
if
(
pVgroup
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pVgroup
->
vgId
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pVgroup
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pVgroup
->
updateTime
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pVgroup
->
version
)
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pVgroup
->
dbName
,
TSDB_FULL_DB_NAME_LEN
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pVgroup
->
replica
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pVgid
->
dnodeId
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
(
int8_t
*
)
&
pVgid
->
role
)
}
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_VGROUP_RESERVE_SIZE
)
return
pRow
;
}
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
)
{
mTrace
(
"vgId:%d, perform insert action"
,
pVgroup
->
vgId
);
return
0
;
}
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
)
{
mTrace
(
"vgId:%d, perform delete action"
,
pVgroup
->
vgId
);
return
0
;
}
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessCompactVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
)
{
mTrace
(
"vgId:%d, perform update action"
,
pOldVgroup
->
vgId
);
pOldVgroup
->
vgId
=
pNewVgroup
->
vgId
;
pOldVgroup
->
createdTime
=
pNewVgroup
->
createdTime
;
pOldVgroup
->
updateTime
=
pNewVgroup
->
updateTime
;
pOldVgroup
->
version
=
pNewVgroup
->
version
;
memcpy
(
pOldVgroup
->
dbName
,
pNewVgroup
->
dbName
,
TSDB_FULL_DB_NAME_LEN
);
pOldVgroup
->
replica
=
pNewVgroup
->
replica
;
memcpy
(
pOldVgroup
->
vnodeGid
,
pNewVgroup
->
vnodeGid
,
TSDB_MAX_REPLICA
*
sizeof
(
SVnodeGid
));
return
0
;
}
SVgObj
*
mndAcquireVgroup
(
SMnode
*
pMnode
,
int32_t
vgId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_VGROUP
,
&
vgId
);
}
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pVgroup
);
}
static
void
mndGetVgroupMaxReplica
(
SMnode
*
pMnode
,
char
*
dbName
,
int8_t
*
pReplica
,
int32_t
*
pNumOfVgroups
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int8_t
replica
=
1
;
int32_t
numOfVgroups
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
strcmp
(
pVgroup
->
dbName
,
dbName
)
==
0
)
{
replica
=
MAX
(
replica
,
pVgroup
->
replica
);
numOfVgroups
++
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
*
pReplica
=
replica
;
*
pNumOfVgroups
=
numOfVgroups
;
}
static
int32_t
mndGetVgroupMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
mndGetVgroupMaxReplica
(
pMnode
,
pShow
->
db
,
&
pShow
->
replica
,
&
pShow
->
numOfRows
);
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"vgId"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"tables"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
for
(
int32_t
i
=
0
;
i
<
pShow
->
replica
;
++
i
)
{
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
snprintf
(
pSchema
[
cols
].
name
,
TSDB_COL_NAME_LEN
,
"v%d_dnode"
,
i
+
1
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
9
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
snprintf
(
pSchema
[
cols
].
name
,
TSDB_COL_NAME_LEN
,
"v%d_status"
,
i
+
1
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
}
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tableFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
int32_t
mndRetrieveVgroups
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SVgObj
*
pVgroup
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pShow
->
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pVgroup
->
vgId
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pVgroup
->
numOfTables
;
cols
++
;
for
(
int32_t
i
=
0
;
i
<
pShow
->
replica
;
++
i
)
{
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pVgroup
->
vnodeGid
[
i
].
dnodeId
;
cols
++
;
const
char
*
role
=
mndGetRoleStr
(
pVgroup
->
vnodeGid
[
i
].
role
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
role
,
pShow
->
bytes
[
cols
]);
cols
++
;
}
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int8_t
*
)
pWrite
=
pVgroup
->
compact
;
cols
++
;
sdbRelease
(
pSdb
,
pVgroup
);
numOfRows
++
;
}
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
if
(
dnodeId
==
0
)
{
return
0
;
}
...
...
@@ -96,7 +345,7 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pShow
->
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pShow
->
pIter
==
NULL
)
break
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
&&
numOfRows
<
rows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
replica
&&
numOfRows
<
rows
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
if
(
pVgid
->
dnodeId
!=
dnodeId
)
continue
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录