Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
98ec34b4
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
98ec34b4
编写于
11月 18, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-2046
上级
97e41cdf
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
234 addition
and
238 deletion
+234
-238
src/mnode/inc/mnodeSdb.h
src/mnode/inc/mnodeSdb.h
+2
-2
src/mnode/src/mnodeAcct.c
src/mnode/src/mnodeAcct.c
+9
-9
src/mnode/src/mnodeCluster.c
src/mnode/src/mnodeCluster.c
+5
-5
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+12
-12
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+11
-11
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+11
-15
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+114
-114
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+40
-40
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+11
-11
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+19
-19
未找到文件。
src/mnode/inc/mnodeSdb.h
浏览文件 @
98ec34b4
...
...
@@ -57,7 +57,7 @@ typedef struct SSWriteMsg {
void
*
rowData
;
int32_t
(
*
fpReq
)(
SMnodeMsg
*
pMsg
);
int32_t
(
*
fpWrite
)(
SMnodeMsg
*
pMsg
,
int32_t
code
);
void
*
p
Obj
;
void
*
p
Row
;
SMnodeMsg
*
pMsg
;
struct
SSdbTable
*
pTable
;
}
SSWriteMsg
;
...
...
@@ -75,7 +75,7 @@ typedef struct {
int32_t
(
*
fpEncode
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fpDecode
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fpDestroy
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fp
D
estored
)();
int32_t
(
*
fp
R
estored
)();
}
SSdbTableDesc
;
int32_t
sdbInit
();
...
...
src/mnode/src/mnodeAcct.c
浏览文件 @
98ec34b4
...
...
@@ -32,14 +32,14 @@ static int32_t tsAcctUpdateSize;
static
int32_t
mnodeCreateRootAcct
();
static
int32_t
mnodeAcctActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
SAcctObj
*
pAcct
=
pWMsg
->
p
Obj
;
SAcctObj
*
pAcct
=
pWMsg
->
p
Row
;
pthread_mutex_destroy
(
&
pAcct
->
mutex
);
tfree
(
pWMsg
->
p
Obj
);
tfree
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SAcctObj
*
pAcct
=
pWMsg
->
p
Obj
;
SAcctObj
*
pAcct
=
pWMsg
->
p
Row
;
memset
(
&
pAcct
->
acctInfo
,
0
,
sizeof
(
SAcctInfo
));
pAcct
->
acctInfo
.
accessState
=
TSDB_VN_ALL_ACCCESS
;
pthread_mutex_init
(
&
pAcct
->
mutex
,
NULL
);
...
...
@@ -47,14 +47,14 @@ static int32_t mnodeAcctActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeAcctActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SAcctObj
*
pAcct
=
pWMsg
->
p
Obj
;
SAcctObj
*
pAcct
=
pWMsg
->
p
Row
;
mnodeDropAllUsers
(
pAcct
);
mnodeDropAllDbs
(
pAcct
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SAcctObj
*
pAcct
=
pWMsg
->
p
Obj
;
SAcctObj
*
pAcct
=
pWMsg
->
p
Row
;
SAcctObj
*
pSaved
=
mnodeGetAcct
(
pAcct
->
user
);
if
(
pAcct
!=
pSaved
)
{
memcpy
(
pSaved
,
pAcct
,
tsAcctUpdateSize
);
...
...
@@ -65,7 +65,7 @@ static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeAcctActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SAcctObj
*
pAcct
=
pWMsg
->
p
Obj
;
SAcctObj
*
pAcct
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pAcct
,
tsAcctUpdateSize
);
pWMsg
->
rowSize
=
tsAcctUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -76,7 +76,7 @@ static int32_t mnodeAcctActionDecode(SSWriteMsg *pWMsg) {
if
(
pAcct
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pAcct
,
pWMsg
->
rowData
,
tsAcctUpdateSize
);
pWMsg
->
p
Obj
=
pAcct
;
pWMsg
->
p
Row
=
pAcct
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -112,7 +112,7 @@ int32_t mnodeInitAccts() {
.
fpEncode
=
mnodeAcctActionEncode
,
.
fpDecode
=
mnodeAcctActionDecode
,
.
fpDestroy
=
mnodeAcctActionDestroy
,
.
fp
D
estored
=
mnodeAcctActionRestored
.
fp
R
estored
=
mnodeAcctActionRestored
};
tsAcctSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -229,7 +229,7 @@ static int32_t mnodeCreateRootAcct() {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsAcctSdb
,
.
p
Obj
=
pAcct
,
.
p
Row
=
pAcct
,
};
return
sdbInsertRow
(
&
wmsg
);
...
...
src/mnode/src/mnodeCluster.c
浏览文件 @
98ec34b4
...
...
@@ -33,7 +33,7 @@ static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
static
int32_t
mnodeRetrieveClusters
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mnodeClusterActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
tfree
(
pWMsg
->
p
Obj
);
tfree
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -50,7 +50,7 @@ static int32_t mnodeClusterActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeClusterActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SClusterObj
*
pCluster
=
pWMsg
->
p
Obj
;
SClusterObj
*
pCluster
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pCluster
,
tsClusterUpdateSize
);
pWMsg
->
rowSize
=
tsClusterUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -61,7 +61,7 @@ static int32_t mnodeClusterActionDecode(SSWriteMsg *pWMsg) {
if
(
pCluster
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pCluster
,
pWMsg
->
rowData
,
tsClusterUpdateSize
);
pWMsg
->
p
Obj
=
pCluster
;
pWMsg
->
p
Row
=
pCluster
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -97,7 +97,7 @@ int32_t mnodeInitCluster() {
.
fpEncode
=
mnodeClusterActionEncode
,
.
fpDecode
=
mnodeClusterActionDecode
,
.
fpDestroy
=
mnodeClusterActionDestroy
,
.
fp
D
estored
=
mnodeClusterActionRestored
.
fp
R
estored
=
mnodeClusterActionRestored
};
tsClusterSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -148,7 +148,7 @@ static int32_t mnodeCreateCluster() {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsClusterSdb
,
.
p
Obj
=
pCluster
,
.
p
Row
=
pCluster
,
};
return
sdbInsertRow
(
&
wmsg
);
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
98ec34b4
...
...
@@ -57,7 +57,7 @@ static void mnodeDestroyDb(SDbObj *pDb) {
}
static
int32_t
mnodeDbActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
mnodeDestroyDb
(
pWMsg
->
p
Obj
);
mnodeDestroyDb
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -66,7 +66,7 @@ int64_t mnodeGetDbNum() {
}
static
int32_t
mnodeDbActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SDbObj
*
pDb
=
pWMsg
->
p
Obj
;
SDbObj
*
pDb
=
pWMsg
->
p
Row
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pDb
->
acct
);
pthread_mutex_init
(
&
pDb
->
mutex
,
NULL
);
...
...
@@ -92,7 +92,7 @@ static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDbActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SDbObj
*
pDb
=
pWMsg
->
p
Obj
;
SDbObj
*
pDb
=
pWMsg
->
p
Row
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pDb
->
acct
);
mnodeDropAllChildTables
(
pDb
);
...
...
@@ -108,7 +108,7 @@ static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDbActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SDbObj
*
pNew
=
pWMsg
->
p
Obj
;
SDbObj
*
pNew
=
pWMsg
->
p
Row
;
SDbObj
*
pDb
=
mnodeGetDb
(
pNew
->
name
);
if
(
pDb
!=
NULL
&&
pNew
!=
pDb
)
{
memcpy
(
pDb
,
pNew
,
pWMsg
->
rowSize
);
...
...
@@ -121,7 +121,7 @@ static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDbActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SDbObj
*
pDb
=
pWMsg
->
p
Obj
;
SDbObj
*
pDb
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pDb
,
tsDbUpdateSize
);
pWMsg
->
rowSize
=
tsDbUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -132,7 +132,7 @@ static int32_t mnodeDbActionDecode(SSWriteMsg *pWMsg) {
if
(
pDb
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pDb
,
pWMsg
->
rowData
,
tsDbUpdateSize
);
pWMsg
->
p
Obj
=
pDb
;
pWMsg
->
p
Row
=
pDb
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -157,7 +157,7 @@ int32_t mnodeInitDbs() {
.
fpEncode
=
mnodeDbActionEncode
,
.
fpDecode
=
mnodeDbActionDecode
,
.
fpDestroy
=
mnodeDbActionDestroy
,
.
fp
D
estored
=
mnodeDbActionRestored
.
fp
R
estored
=
mnodeDbActionRestored
};
tsDbSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -415,7 +415,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDbSdb
,
.
p
Obj
=
pDb
,
.
p
Row
=
pDb
,
.
rowSize
=
sizeof
(
SDbObj
),
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeCreateDbCb
...
...
@@ -810,7 +810,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDbSdb
,
.
p
Obj
=
pDb
.
p
Row
=
pDb
};
int32_t
code
=
sdbUpdateRow
(
&
wmsg
);
...
...
@@ -1022,7 +1022,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDbSdb
,
.
p
Obj
=
pDb
,
.
p
Row
=
pDb
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAlterDbCb
};
...
...
@@ -1074,7 +1074,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDbSdb
,
.
p
Obj
=
pDb
,
.
p
Row
=
pDb
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeDropDbCb
};
...
...
@@ -1137,7 +1137,7 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsDbSdb
,
.
p
Obj
=
pDb
.
p
Row
=
pDb
};
sdbDeleteRow
(
&
wmsg
);
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
98ec34b4
...
...
@@ -88,12 +88,12 @@ static char* offlineReason[] = {
};
static
int32_t
mnodeDnodeActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
tfree
(
pWMsg
->
p
Obj
);
tfree
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SDnodeObj
*
pDnode
=
pWMsg
->
p
Obj
;
SDnodeObj
*
pDnode
=
pWMsg
->
p
Row
;
if
(
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
)
{
pDnode
->
status
=
TAOS_DN_STATUS_OFFLINE
;
pDnode
->
lastAccess
=
tsAccessSquence
;
...
...
@@ -108,7 +108,7 @@ static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDnodeActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SDnodeObj
*
pDnode
=
pWMsg
->
p
Obj
;
SDnodeObj
*
pDnode
=
pWMsg
->
p
Row
;
#ifndef _SYNC
mnodeDropAllDnodeVgroups
(
pDnode
);
...
...
@@ -122,7 +122,7 @@ static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDnodeActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SDnodeObj
*
pNew
=
pWMsg
->
p
Obj
;
SDnodeObj
*
pNew
=
pWMsg
->
p
Row
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pNew
->
dnodeId
);
if
(
pDnode
!=
NULL
&&
pNew
!=
pDnode
)
{
memcpy
(
pDnode
,
pNew
,
pWMsg
->
rowSize
);
...
...
@@ -135,7 +135,7 @@ static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeDnodeActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SDnodeObj
*
pDnode
=
pWMsg
->
p
Obj
;
SDnodeObj
*
pDnode
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pDnode
,
tsDnodeUpdateSize
);
pWMsg
->
rowSize
=
tsDnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -146,7 +146,7 @@ static int32_t mnodeDnodeActionDecode(SSWriteMsg *pWMsg) {
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pDnode
,
pWMsg
->
rowData
,
tsDnodeUpdateSize
);
pWMsg
->
p
Obj
=
pDnode
;
pWMsg
->
p
Row
=
pDnode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -184,7 +184,7 @@ int32_t mnodeInitDnodes() {
.
fpEncode
=
mnodeDnodeActionEncode
,
.
fpDecode
=
mnodeDnodeActionDecode
,
.
fpDestroy
=
mnodeDnodeActionDestroy
,
.
fp
D
estored
=
mnodeDnodeActionRestored
.
fp
R
estored
=
mnodeDnodeActionRestored
};
tsDnodeSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -299,7 +299,7 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDnodeSdb
,
.
p
Obj
=
pDnode
.
p
Row
=
pDnode
};
int32_t
code
=
sdbUpdateRow
(
&
wmsg
);
...
...
@@ -647,7 +647,7 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDnodeSdb
,
.
p
Obj
=
pDnode
,
.
p
Row
=
pDnode
,
.
rowSize
=
sizeof
(
SDnodeObj
),
.
pMsg
=
pMsg
};
...
...
@@ -668,7 +668,7 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDnodeSdb
,
.
p
Obj
=
pDnode
,
.
p
Row
=
pDnode
,
.
pMsg
=
pMsg
};
...
...
@@ -1141,7 +1141,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mnodeGetMnodeRoleStr
(
pVgid
->
role
)
);
strcpy
(
pWrite
,
syncRole
[
pVgid
->
role
]
);
cols
++
;
}
}
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
98ec34b4
...
...
@@ -59,12 +59,12 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
#endif
static
int32_t
mnodeMnodeActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
tfree
(
pWMsg
->
p
Obj
);
tfree
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SMnodeObj
*
pMnode
=
pWMsg
->
p
Obj
;
SMnodeObj
*
pMnode
=
pWMsg
->
p
Row
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
...
...
@@ -77,7 +77,7 @@ static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeMnodeActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SMnodeObj
*
pMnode
=
pWMsg
->
p
Obj
;
SMnodeObj
*
pMnode
=
pWMsg
->
p
Row
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
...
...
@@ -89,7 +89,7 @@ static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeMnodeActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SMnodeObj
*
pMnode
=
pWMsg
->
p
Obj
;
SMnodeObj
*
pMnode
=
pWMsg
->
p
Row
;
SMnodeObj
*
pSaved
=
mnodeGetMnode
(
pMnode
->
mnodeId
);
if
(
pMnode
!=
pSaved
)
{
memcpy
(
pSaved
,
pMnode
,
pWMsg
->
rowSize
);
...
...
@@ -100,7 +100,7 @@ static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeMnodeActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SMnodeObj
*
pMnode
=
pWMsg
->
p
Obj
;
SMnodeObj
*
pMnode
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pMnode
,
tsMnodeUpdateSize
);
pWMsg
->
rowSize
=
tsMnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -111,7 +111,7 @@ static int32_t mnodeMnodeActionDecode(SSWriteMsg *pWMsg) {
if
(
pMnode
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pMnode
,
pWMsg
->
rowData
,
tsMnodeUpdateSize
);
pWMsg
->
p
Obj
=
pMnode
;
pWMsg
->
p
Row
=
pMnode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -150,7 +150,7 @@ int32_t mnodeInitMnodes() {
.
fpEncode
=
mnodeMnodeActionEncode
,
.
fpDecode
=
mnodeMnodeActionDecode
,
.
fpDestroy
=
mnodeMnodeActionDestroy
,
.
fp
D
estored
=
mnodeMnodeActionRestored
.
fp
R
estored
=
mnodeMnodeActionRestored
};
tsMnodeSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -192,10 +192,6 @@ void *mnodeGetNextMnode(void *pIter, SMnodeObj **pMnode) {
return
sdbFetchRow
(
tsMnodeSdb
,
pIter
,
(
void
**
)
pMnode
);
}
char
*
mnodeGetMnodeRoleStr
(
int32_t
role
)
{
return
syncRole
[
role
];
}
void
mnodeUpdateMnodeEpSet
()
{
mInfo
(
"update mnodes epSet, numOfEps:%d "
,
mnodeGetMnodesNum
());
...
...
@@ -332,7 +328,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsMnodeSdb
,
.
p
Obj
=
pMnode
,
.
p
Row
=
pMnode
,
.
fpWrite
=
mnodeCreateMnodeCb
};
...
...
@@ -356,7 +352,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
void
mnodeDropMnodeLocal
(
int32_t
dnodeId
)
{
SMnodeObj
*
pMnode
=
mnodeGetMnode
(
dnodeId
);
if
(
pMnode
!=
NULL
)
{
SSWriteMsg
wmsg
=
{.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsMnodeSdb
,
.
p
Obj
=
pMnode
};
SSWriteMsg
wmsg
=
{.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsMnodeSdb
,
.
p
Row
=
pMnode
};
sdbDeleteRow
(
&
wmsg
);
mnodeDecMnodeRef
(
pMnode
);
}
...
...
@@ -374,7 +370,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsMnodeSdb
,
.
p
Obj
=
pMnode
.
p
Row
=
pMnode
};
int32_t
code
=
sdbDeleteRow
(
&
wmsg
);
...
...
@@ -469,7 +465,7 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
char
*
roles
=
mnodeGetMnodeRoleStr
(
pMnode
->
role
)
;
char
*
roles
=
syncRole
[
pMnode
->
role
]
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
roles
,
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
98ec34b4
...
...
@@ -37,15 +37,15 @@
#define SDB_SYNC_HACK 16
typedef
enum
{
SDB_ACTION_INSERT
,
SDB_ACTION_DELETE
,
SDB_ACTION_UPDATE
SDB_ACTION_INSERT
=
0
,
SDB_ACTION_DELETE
=
1
,
SDB_ACTION_UPDATE
=
2
}
ESdbAction
;
typedef
enum
{
SDB_STATUS_OFFLINE
,
SDB_STATUS_SERVING
,
SDB_STATUS_CLOSING
SDB_STATUS_OFFLINE
=
0
,
SDB_STATUS_SERVING
=
1
,
SDB_STATUS_CLOSING
=
2
}
ESdbStatus
;
typedef
struct
SSdbTable
{
...
...
@@ -64,7 +64,7 @@ typedef struct SSdbTable {
int32_t
(
*
fpDecode
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fpEncode
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fpDestroy
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fp
D
estored
)();
int32_t
(
*
fp
R
estored
)();
pthread_mutex_t
mutex
;
}
SSdbTable
;
...
...
@@ -78,7 +78,7 @@ typedef struct {
int32_t
numOfTables
;
SSdbTable
*
tableList
[
SDB_TABLE_MAX
];
pthread_mutex_t
mutex
;
}
SSdb
Objec
t
;
}
SSdb
Mgm
t
;
typedef
struct
{
pthread_t
thread
;
...
...
@@ -92,7 +92,7 @@ typedef struct {
extern
void
*
tsMnodeTmr
;
static
void
*
tsSdbTmr
;
static
SSdb
Object
tsSdbObj
=
{
0
};
static
SSdb
Mgmt
tsSdbMgmt
=
{
0
};
static
taos_qset
tsSdbWQset
;
static
taos_qall
tsSdbWQall
;
static
taos_queue
tsSdbWQueue
;
...
...
@@ -121,15 +121,15 @@ int64_t sdbGetNumOfRows(void *pTable) {
}
uint64_t
sdbGetVersion
()
{
return
tsSdb
Obj
.
version
;
return
tsSdb
Mgmt
.
version
;
}
bool
sdbIsMaster
()
{
return
tsSdb
Obj
.
role
==
TAOS_SYNC_ROLE_MASTER
;
return
tsSdb
Mgmt
.
role
==
TAOS_SYNC_ROLE_MASTER
;
}
bool
sdbIsServing
()
{
return
tsSdb
Obj
.
status
==
SDB_STATUS_SERVING
;
return
tsSdb
Mgmt
.
status
==
SDB_STATUS_SERVING
;
}
static
void
*
sdbGetObjKey
(
SSdbTable
*
pTable
,
void
*
key
)
{
...
...
@@ -172,21 +172,21 @@ static char *sdbGetObjStr(SSdbTable *pTable, void *key) {
}
static
void
*
sdbGetTableFromId
(
int32_t
tableId
)
{
return
tsSdb
Obj
.
tableList
[
tableId
];
return
tsSdb
Mgmt
.
tableList
[
tableId
];
}
static
int32_t
sdbInitWal
()
{
SWalCfg
walCfg
=
{.
vgId
=
1
,
.
walLevel
=
TAOS_WAL_FSYNC
,
.
keep
=
TAOS_WAL_KEEP
,
.
fsyncPeriod
=
0
};
char
temp
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
temp
,
"%s/wal"
,
tsMnodeDir
);
tsSdb
Obj
.
wal
=
walOpen
(
temp
,
&
walCfg
);
if
(
tsSdb
Obj
.
wal
==
NULL
)
{
tsSdb
Mgmt
.
wal
=
walOpen
(
temp
,
&
walCfg
);
if
(
tsSdb
Mgmt
.
wal
==
NULL
)
{
sdbError
(
"vgId:1, failed to open wal in %s"
,
tsMnodeDir
);
return
-
1
;
}
sdbInfo
(
"vgId:1, open wal for restore"
);
int
code
=
walRestore
(
tsSdb
Obj
.
wal
,
NULL
,
sdbWrite
);
int
code
=
walRestore
(
tsSdb
Mgmt
.
wal
,
NULL
,
sdbWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, failed to open wal for restore since %s"
,
tstrerror
(
code
));
return
-
1
;
...
...
@@ -200,8 +200,8 @@ static void sdbRestoreTables() {
for
(
int32_t
tableId
=
0
;
tableId
<
SDB_TABLE_MAX
;
++
tableId
)
{
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
if
(
pTable
==
NULL
)
continue
;
if
(
pTable
->
fp
D
estored
)
{
(
*
pTable
->
fp
D
estored
)();
if
(
pTable
->
fp
R
estored
)
{
(
*
pTable
->
fp
R
estored
)();
}
totalRows
+=
pTable
->
numOfRows
;
...
...
@@ -209,22 +209,22 @@ static void sdbRestoreTables() {
sdbDebug
(
"vgId:1, sdb:%s is restored, rows:%"
PRId64
,
pTable
->
tableName
,
pTable
->
numOfRows
);
}
sdbInfo
(
"vgId:1, sdb is restored, mver:%"
PRIu64
" rows:%d tables:%d"
,
tsSdb
Obj
.
version
,
totalRows
,
numOfTables
);
sdbInfo
(
"vgId:1, sdb is restored, mver:%"
PRIu64
" rows:%d tables:%d"
,
tsSdb
Mgmt
.
version
,
totalRows
,
numOfTables
);
}
void
sdbUpdateMnodeRoles
()
{
if
(
tsSdb
Obj
.
sync
<=
0
)
return
;
if
(
tsSdb
Mgmt
.
sync
<=
0
)
return
;
SNodesRole
roles
=
{
0
};
syncGetNodesRole
(
tsSdb
Obj
.
sync
,
&
roles
);
syncGetNodesRole
(
tsSdb
Mgmt
.
sync
,
&
roles
);
sdbInfo
(
"vgId:1, update mnodes role
s, replica:%d"
,
tsSdbObj
.
cfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
tsSdb
Obj
.
cfg
.
replica
;
++
i
)
{
sdbInfo
(
"vgId:1, update mnodes role
, replica:%d"
,
tsSdbMgmt
.
cfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
tsSdb
Mgmt
.
cfg
.
replica
;
++
i
)
{
SMnodeObj
*
pMnode
=
mnodeGetMnode
(
roles
.
nodeId
[
i
]);
if
(
pMnode
!=
NULL
)
{
pMnode
->
role
=
roles
.
role
[
i
];
sdbInfo
(
"vgId:1, mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
mnodeGetMnodeRoleStr
(
pMnode
->
role
)
);
if
(
pMnode
->
mnodeId
==
dnodeGetDnodeId
())
tsSdb
Obj
.
role
=
pMnode
->
role
;
sdbInfo
(
"vgId:1, mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
syncRole
[
pMnode
->
role
]
);
if
(
pMnode
->
mnodeId
==
dnodeGetDnodeId
())
tsSdb
Mgmt
.
role
=
pMnode
->
role
;
mnodeDecMnodeRef
(
pMnode
);
}
}
...
...
@@ -239,16 +239,16 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3
}
static
int32_t
sdbGetWalInfo
(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
)
{
return
walGetWalFile
(
tsSdb
Obj
.
wal
,
fileName
,
fileId
);
return
walGetWalFile
(
tsSdb
Mgmt
.
wal
,
fileName
,
fileId
);
}
static
void
sdbNotifyRole
(
void
*
ahandle
,
int8_t
role
)
{
sdbInfo
(
"vgId:1, mnode role changed from %s to %s"
,
mnodeGetMnodeRoleStr
(
tsSdbObj
.
role
),
mnodeGetMnodeRoleStr
(
role
)
);
sdbInfo
(
"vgId:1, mnode role changed from %s to %s"
,
syncRole
[
tsSdbMgmt
.
role
],
syncRole
[
role
]
);
if
(
role
==
TAOS_SYNC_ROLE_MASTER
&&
tsSdb
Obj
.
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
if
(
role
==
TAOS_SYNC_ROLE_MASTER
&&
tsSdb
Mgmt
.
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
balanceReset
();
}
tsSdb
Obj
.
role
=
role
;
tsSdb
Mgmt
.
role
=
role
;
sdbUpdateMnodeRoles
();
}
...
...
@@ -276,7 +276,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
if
(
pWrite
->
retCode
!=
TSDB_CODE_SUCCESS
)
{
SWalHead
*
pHead
=
(
void
*
)
pWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
;
int32_t
action
=
pHead
->
msgType
%
10
;
sdbError
(
"vgId:1, key:%p:%s hver:%"
PRIu64
" action:%d, failed to foward since %s"
,
pWrite
->
p
Obj
,
sdbError
(
"vgId:1, key:%p:%s hver:%"
PRIu64
" action:%d, failed to foward since %s"
,
pWrite
->
p
Row
,
sdbGetKeyStr
(
pWrite
->
pTable
,
pHead
->
cont
),
pHead
->
version
,
action
,
tstrerror
(
pWrite
->
retCode
));
if
(
action
==
SDB_ACTION_INSERT
)
{
// It's better to create a table in two stages, create it first and then set it success
...
...
@@ -284,7 +284,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
pWrite
->
pTable
,
.
p
Obj
=
pWrite
->
pObj
.
p
Row
=
pWrite
->
pRow
};
sdbDeleteRow
(
&
wmsg
);
}
...
...
@@ -297,7 +297,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
// if ahandle, means this func is called by sdb write
if
(
ahandle
==
NULL
)
{
sdbDecRef
(
pWrite
->
pTable
,
pWrite
->
p
Obj
);
sdbDecRef
(
pWrite
->
pTable
,
pWrite
->
p
Row
);
}
taosFreeQitem
(
pWrite
);
...
...
@@ -369,7 +369,7 @@ void sdbUpdateSync(void *pMnodes) {
return
;
}
if
(
memcmp
(
&
syncCfg
,
&
tsSdb
Obj
.
cfg
,
sizeof
(
SSyncCfg
))
==
0
)
{
if
(
memcmp
(
&
syncCfg
,
&
tsSdb
Mgmt
.
cfg
,
sizeof
(
SSyncCfg
))
==
0
)
{
sdbDebug
(
"vgId:1, update sync config, info not changed"
);
return
;
}
...
...
@@ -391,18 +391,18 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo
.
writeToCache
=
sdbWriteToQueue
;
syncInfo
.
confirmForward
=
sdbConfirmForward
;
syncInfo
.
notifyRole
=
sdbNotifyRole
;
tsSdb
Obj
.
cfg
=
syncCfg
;
tsSdb
Mgmt
.
cfg
=
syncCfg
;
if
(
tsSdb
Obj
.
sync
)
{
syncReconfig
(
tsSdb
Obj
.
sync
,
&
syncCfg
);
if
(
tsSdb
Mgmt
.
sync
)
{
syncReconfig
(
tsSdb
Mgmt
.
sync
,
&
syncCfg
);
}
else
{
tsSdb
Obj
.
sync
=
syncStart
(
&
syncInfo
);
tsSdb
Mgmt
.
sync
=
syncStart
(
&
syncInfo
);
}
sdbUpdateMnodeRoles
();
}
int32_t
sdbInit
()
{
pthread_mutex_init
(
&
tsSdb
Obj
.
mutex
,
NULL
);
pthread_mutex_init
(
&
tsSdb
Mgmt
.
mutex
,
NULL
);
if
(
sdbInitWorker
()
!=
0
)
{
return
-
1
;
...
...
@@ -415,55 +415,55 @@ int32_t sdbInit() {
sdbRestoreTables
();
if
(
mnodeGetMnodesNum
()
==
1
)
{
tsSdb
Obj
.
role
=
TAOS_SYNC_ROLE_MASTER
;
tsSdb
Mgmt
.
role
=
TAOS_SYNC_ROLE_MASTER
;
}
tsSdb
Obj
.
status
=
SDB_STATUS_SERVING
;
tsSdb
Mgmt
.
status
=
SDB_STATUS_SERVING
;
return
TSDB_CODE_SUCCESS
;
}
void
sdbCleanUp
()
{
if
(
tsSdb
Obj
.
status
!=
SDB_STATUS_SERVING
)
return
;
if
(
tsSdb
Mgmt
.
status
!=
SDB_STATUS_SERVING
)
return
;
tsSdb
Obj
.
status
=
SDB_STATUS_CLOSING
;
tsSdb
Mgmt
.
status
=
SDB_STATUS_CLOSING
;
sdbCleanupWorker
();
sdbDebug
(
"vgId:1, sdb will be closed, mver:%"
PRIu64
,
tsSdb
Obj
.
version
);
sdbDebug
(
"vgId:1, sdb will be closed, mver:%"
PRIu64
,
tsSdb
Mgmt
.
version
);
if
(
tsSdb
Obj
.
sync
)
{
syncStop
(
tsSdb
Obj
.
sync
);
tsSdb
Obj
.
sync
=
-
1
;
if
(
tsSdb
Mgmt
.
sync
)
{
syncStop
(
tsSdb
Mgmt
.
sync
);
tsSdb
Mgmt
.
sync
=
-
1
;
}
if
(
tsSdb
Obj
.
wal
)
{
walClose
(
tsSdb
Obj
.
wal
);
tsSdb
Obj
.
wal
=
NULL
;
if
(
tsSdb
Mgmt
.
wal
)
{
walClose
(
tsSdb
Mgmt
.
wal
);
tsSdb
Mgmt
.
wal
=
NULL
;
}
pthread_mutex_destroy
(
&
tsSdb
Obj
.
mutex
);
pthread_mutex_destroy
(
&
tsSdb
Mgmt
.
mutex
);
}
void
sdbIncRef
(
void
*
tparam
,
void
*
p
Obj
)
{
if
(
p
Obj
==
NULL
||
tparam
==
NULL
)
return
;
void
sdbIncRef
(
void
*
tparam
,
void
*
p
Row
)
{
if
(
p
Row
==
NULL
||
tparam
==
NULL
)
return
;
SSdbTable
*
pTable
=
tparam
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
p
Obj
+
pTable
->
refCountPos
);
int32_t
*
pRefCount
=
(
int32_t
*
)(
p
Row
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_add_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"vgId:1, sdb:%s, inc ref to key:%p:%s:%d"
,
pTable
->
tableName
,
p
Obj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
sdbTrace
(
"vgId:1, sdb:%s, inc ref to key:%p:%s:%d"
,
pTable
->
tableName
,
p
Row
,
sdbGetObjStr
(
pTable
,
pRow
),
refCount
);
}
void
sdbDecRef
(
void
*
tparam
,
void
*
p
Obj
)
{
if
(
p
Obj
==
NULL
||
tparam
==
NULL
)
return
;
void
sdbDecRef
(
void
*
tparam
,
void
*
p
Row
)
{
if
(
p
Row
==
NULL
||
tparam
==
NULL
)
return
;
SSdbTable
*
pTable
=
tparam
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
p
Obj
+
pTable
->
refCountPos
);
int32_t
*
pRefCount
=
(
int32_t
*
)(
p
Row
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"vgId:1, sdb:%s, dec ref to key:%p:%s:%d"
,
pTable
->
tableName
,
p
Obj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
sdbTrace
(
"vgId:1, sdb:%s, dec ref to key:%p:%s:%d"
,
pTable
->
tableName
,
p
Row
,
sdbGetObjStr
(
pTable
,
pRow
),
refCount
);
int32_t
*
updateEnd
=
p
Obj
+
pTable
->
refCountPos
-
4
;
int32_t
*
updateEnd
=
p
Row
+
pTable
->
refCountPos
-
4
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"vgId:1, sdb:%s, key:%p:%s:%d destroyed"
,
pTable
->
tableName
,
p
Obj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
SSWriteMsg
wmsg
=
{.
p
Obj
=
pObj
};
sdbTrace
(
"vgId:1, sdb:%s, key:%p:%s:%d destroyed"
,
pTable
->
tableName
,
p
Row
,
sdbGetObjStr
(
pTable
,
pRow
),
refCount
);
SSWriteMsg
wmsg
=
{.
p
Row
=
pRow
};
(
*
pTable
->
fpDestroy
)(
&
wmsg
);
}
}
...
...
@@ -502,7 +502,7 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
}
static
int32_t
sdbInsertHash
(
SSdbTable
*
pTable
,
SSWriteMsg
*
pWrite
)
{
void
*
key
=
sdbGetObjKey
(
pTable
,
pWrite
->
p
Obj
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pWrite
->
p
Row
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
...
...
@@ -510,25 +510,25 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
}
pthread_mutex_lock
(
&
pTable
->
mutex
);
taosHashPut
(
pTable
->
iHandle
,
key
,
keySize
,
&
pWrite
->
p
Obj
,
sizeof
(
int64_t
));
taosHashPut
(
pTable
->
iHandle
,
key
,
keySize
,
&
pWrite
->
p
Row
,
sizeof
(
int64_t
));
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbIncRef
(
pTable
,
pWrite
->
p
Obj
);
sdbIncRef
(
pTable
,
pWrite
->
p
Row
);
atomic_add_fetch_32
(
&
pTable
->
numOfRows
,
1
);
if
(
pTable
->
keyType
==
SDB_KEY_AUTO
)
{
pTable
->
autoIndex
=
MAX
(
pTable
->
autoIndex
,
*
((
uint32_t
*
)
pWrite
->
p
Obj
));
pTable
->
autoIndex
=
MAX
(
pTable
->
autoIndex
,
*
((
uint32_t
*
)
pWrite
->
p
Row
));
}
else
{
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
}
sdbDebug
(
"vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
),
pWrite
->
rowSize
,
pTable
->
numOfRows
,
pWrite
->
pMsg
);
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
),
pWrite
->
rowSize
,
pTable
->
numOfRows
,
pWrite
->
pMsg
);
int32_t
code
=
(
*
pTable
->
fpInsert
)(
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s to hash, remove it"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
));
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
));
sdbDeleteHash
(
pTable
,
pWrite
);
}
...
...
@@ -536,17 +536,17 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
}
static
int32_t
sdbDeleteHash
(
SSdbTable
*
pTable
,
SSWriteMsg
*
pWrite
)
{
int32_t
*
updateEnd
=
pWrite
->
p
Obj
+
pTable
->
refCountPos
-
4
;
int32_t
*
updateEnd
=
pWrite
->
p
Row
+
pTable
->
refCountPos
-
4
;
bool
set
=
atomic_val_compare_exchange_32
(
updateEnd
,
0
,
1
)
==
0
;
if
(
!
set
)
{
sdbError
(
"vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
));
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
));
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
(
*
pTable
->
fpDelete
)(
pWrite
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pWrite
->
p
Obj
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pWrite
->
p
Row
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
...
...
@@ -559,16 +559,16 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
atomic_sub_fetch_32
(
&
pTable
->
numOfRows
,
1
);
sdbDebug
(
"vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
sdbDecRef
(
pTable
,
pWrite
->
p
Obj
);
sdbDecRef
(
pTable
,
pWrite
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sdbUpdateHash
(
SSdbTable
*
pTable
,
SSWriteMsg
*
pWrite
)
{
sdbDebug
(
"vgId:1, sdb:%s, update key:%s in hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
(
*
pTable
->
fpUpdate
)(
pWrite
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -583,42 +583,42 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
assert
(
pTable
!=
NULL
);
pthread_mutex_lock
(
&
tsSdb
Obj
.
mutex
);
pthread_mutex_lock
(
&
tsSdb
Mgmt
.
mutex
);
if
(
pHead
->
version
==
0
)
{
// assign version
tsSdb
Obj
.
version
++
;
pHead
->
version
=
tsSdb
Obj
.
version
;
tsSdb
Mgmt
.
version
++
;
pHead
->
version
=
tsSdb
Mgmt
.
version
;
}
else
{
// for data from WAL or forward, version may be smaller
if
(
pHead
->
version
<=
tsSdb
Obj
.
version
)
{
pthread_mutex_unlock
(
&
tsSdb
Obj
.
mutex
);
if
(
pHead
->
version
<=
tsSdb
Mgmt
.
version
)
{
pthread_mutex_unlock
(
&
tsSdb
Mgmt
.
mutex
);
sdbDebug
(
"vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdb
Obj
.
version
);
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdb
Mgmt
.
version
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdb
Obj
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdb
Obj
.
mutex
);
}
else
if
(
pHead
->
version
!=
tsSdb
Mgmt
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdb
Mgmt
.
mutex
);
sdbError
(
"vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdb
Obj
.
version
);
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdb
Mgmt
.
version
);
return
TSDB_CODE_SYN_INVALID_VERSION
;
}
else
{
tsSdb
Obj
.
version
=
pHead
->
version
;
tsSdb
Mgmt
.
version
=
pHead
->
version
;
}
}
int32_t
code
=
walWrite
(
tsSdb
Obj
.
wal
,
pHead
);
int32_t
code
=
walWrite
(
tsSdb
Mgmt
.
wal
,
pHead
);
if
(
code
<
0
)
{
pthread_mutex_unlock
(
&
tsSdb
Obj
.
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Mgmt
.
mutex
);
return
code
;
}
pthread_mutex_unlock
(
&
tsSdb
Obj
.
mutex
);
pthread_mutex_unlock
(
&
tsSdb
Mgmt
.
mutex
);
// from app, wmsg is created
if
(
pWrite
!=
NULL
)
{
// forward to peers
pWrite
->
processedCount
=
0
;
int32_t
syncCode
=
syncForwardToPeer
(
tsSdb
Obj
.
sync
,
pHead
,
pWrite
,
TAOS_QTYPE_RPC
);
int32_t
syncCode
=
syncForwardToPeer
(
tsSdb
Mgmt
.
sync
,
pHead
,
pWrite
,
TAOS_QTYPE_RPC
);
if
(
syncCode
<=
0
)
pWrite
->
processedCount
=
1
;
if
(
syncCode
<
0
)
{
...
...
@@ -638,7 +638,7 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer
(
tsSdb
Obj
.
sync
,
pHead
,
pWrite
,
TAOS_QTYPE_RPC
);
syncForwardToPeer
(
tsSdb
Mgmt
.
sync
,
pHead
,
pWrite
,
TAOS_QTYPE_RPC
);
// from wal or forward msg, wmsg not created, should add into hash
if
(
action
==
SDB_ACTION_INSERT
)
{
...
...
@@ -652,7 +652,7 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
sdbGetKeyStr
(
pTable
,
pHead
->
cont
));
return
TSDB_CODE_SUCCESS
;
}
SSWriteMsg
wmsg
=
{.
pTable
=
pTable
,
.
p
Obj
=
pRow
};
SSWriteMsg
wmsg
=
{.
pTable
=
pTable
,
.
p
Row
=
pRow
};
return
sdbDeleteHash
(
pTable
,
&
wmsg
);
}
else
if
(
action
==
SDB_ACTION_UPDATE
)
{
void
*
pRow
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
...
...
@@ -673,19 +673,19 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
SSdbTable
*
pTable
=
pWrite
->
pTable
;
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
;
if
(
sdbGetRowFromObj
(
pTable
,
pWrite
->
p
Obj
))
{
if
(
sdbGetRowFromObj
(
pTable
,
pWrite
->
p
Row
))
{
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s, already exist"
,
pTable
->
tableName
,
sdbGetObjStr
(
pTable
,
pWrite
->
p
Obj
));
sdbDecRef
(
pTable
,
pWrite
->
p
Obj
);
sdbGetObjStr
(
pTable
,
pWrite
->
p
Row
));
sdbDecRef
(
pTable
,
pWrite
->
p
Row
);
return
TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE
;
}
if
(
pTable
->
keyType
==
SDB_KEY_AUTO
)
{
*
((
uint32_t
*
)
pWrite
->
p
Obj
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
*
((
uint32_t
*
)
pWrite
->
p
Row
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
// let vgId increase from 2
if
(
pTable
->
autoIndex
==
1
&&
strcmp
(
pTable
->
tableName
,
"vgroups"
)
==
0
)
{
*
((
uint32_t
*
)
pWrite
->
p
Obj
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
*
((
uint32_t
*
)
pWrite
->
p
Row
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
}
}
...
...
@@ -727,10 +727,10 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, insert action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Obj
,
sdbGetObjStr
(
pTable
,
pWrite
->
pObj
));
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Row
,
sdbGetObjStr
(
pTable
,
pWrite
->
pRow
));
}
sdbIncRef
(
pNewOper
->
pTable
,
pNewOper
->
p
Obj
);
sdbIncRef
(
pNewOper
->
pTable
,
pNewOper
->
p
Row
);
taosWriteQitem
(
tsSdbWQueue
,
TAOS_QTYPE_RPC
,
pNewOper
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
...
...
@@ -748,24 +748,24 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
SSdbTable
*
pTable
=
pWrite
->
pTable
;
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
;
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
p
Obj
);
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
p
Row
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
sdbIncRef
(
pTable
,
pWrite
->
p
Obj
);
sdbIncRef
(
pTable
,
pWrite
->
p
Row
);
int32_t
code
=
sdbDeleteHash
(
pTable
,
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to delete from hash"
,
pTable
->
tableName
);
sdbDecRef
(
pTable
,
pWrite
->
p
Obj
);
sdbDecRef
(
pTable
,
pWrite
->
p
Row
);
return
code
;
}
// just delete data from memory
if
(
pWrite
->
type
!=
SDB_OPER_GLOBAL
)
{
sdbDecRef
(
pTable
,
pWrite
->
p
Obj
);
sdbDecRef
(
pTable
,
pWrite
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -795,7 +795,7 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, delete action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Obj
,
sdbGetObjStr
(
pTable
,
pWrite
->
pObj
));
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Row
,
sdbGetObjStr
(
pTable
,
pWrite
->
pRow
));
}
taosWriteQitem
(
tsSdbWQueue
,
TAOS_QTYPE_RPC
,
pNewOper
);
...
...
@@ -807,7 +807,7 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
SSdbTable
*
pTable
=
pWrite
->
pTable
;
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
;
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
p
Obj
);
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
p
Row
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, record is not there, update failed"
,
pTable
->
tableName
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
...
...
@@ -850,10 +850,10 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, update action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Obj
,
sdbGetObjStr
(
pTable
,
pWrite
->
pObj
));
pNewOper
->
pMsg
,
pTable
->
tableName
,
pWrite
->
p
Row
,
sdbGetObjStr
(
pTable
,
pWrite
->
pRow
));
}
sdbIncRef
(
pNewOper
->
pTable
,
pNewOper
->
p
Obj
);
sdbIncRef
(
pNewOper
->
pTable
,
pNewOper
->
p
Row
);
taosWriteQitem
(
tsSdbWQueue
,
TAOS_QTYPE_RPC
,
pNewOper
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
...
...
@@ -910,7 +910,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable
->
fpEncode
=
pDesc
->
fpEncode
;
pTable
->
fpDecode
=
pDesc
->
fpDecode
;
pTable
->
fpDestroy
=
pDesc
->
fpDestroy
;
pTable
->
fp
Destored
=
pDesc
->
fpD
estored
;
pTable
->
fp
Restored
=
pDesc
->
fpR
estored
;
_hash_fn_t
hashFp
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
...
...
@@ -918,8 +918,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
}
pTable
->
iHandle
=
taosHashInit
(
pTable
->
hashSessions
,
hashFp
,
true
,
true
);
tsSdb
Obj
.
numOfTables
++
;
tsSdb
Obj
.
tableList
[
pTable
->
tableId
]
=
pTable
;
tsSdb
Mgmt
.
numOfTables
++
;
tsSdb
Mgmt
.
tableList
[
pTable
->
tableId
]
=
pTable
;
return
pTable
;
}
...
...
@@ -927,8 +927,8 @@ void sdbCloseTable(void *handle) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
if
(
pTable
==
NULL
)
return
;
tsSdb
Obj
.
numOfTables
--
;
tsSdb
Obj
.
tableList
[
pTable
->
tableId
]
=
NULL
;
tsSdb
Mgmt
.
numOfTables
--
;
tsSdb
Mgmt
.
tableList
[
pTable
->
tableId
]
=
NULL
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pTable
->
iHandle
);
while
(
taosHashIterNext
(
pIter
))
{
...
...
@@ -936,7 +936,7 @@ void sdbCloseTable(void *handle) {
if
(
ppRow
==
NULL
)
continue
;
SSWriteMsg
wmsg
=
{
.
p
Obj
=
*
ppRow
,
.
p
Row
=
*
ppRow
,
.
pTable
=
pTable
,
};
...
...
@@ -947,7 +947,7 @@ void sdbCloseTable(void *handle) {
taosHashCleanup
(
pTable
->
iHandle
);
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbDebug
(
"vgId:1, sdb:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdb
Obj
.
numOfTables
);
sdbDebug
(
"vgId:1, sdb:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdb
Mgmt
.
numOfTables
);
free
(
pTable
);
}
...
...
@@ -1072,7 +1072,7 @@ static void *sdbWorkerFp(void *pWorker) {
pHead
=
(
void
*
)
pWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
;
if
(
pWrite
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%"
PRIu64
", will be processed in sdb queue"
,
pWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pWrite
->
pMsg
,
pWrite
->
pTable
->
tableName
,
pWrite
->
p
Obj
,
pWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pWrite
->
pMsg
,
pWrite
->
pTable
->
tableName
,
pWrite
->
p
Row
,
sdbGetKeyStr
(
pWrite
->
pTable
,
pHead
->
cont
),
pHead
->
version
);
}
}
else
{
...
...
@@ -1089,7 +1089,7 @@ static void *sdbWorkerFp(void *pWorker) {
}
}
walFsync
(
tsSdb
Obj
.
wal
,
true
);
walFsync
(
tsSdb
Mgmt
.
wal
,
true
);
// browse all items, and process them one by one
taosResetQitems
(
tsSdbWQall
);
...
...
@@ -1101,7 +1101,7 @@ static void *sdbWorkerFp(void *pWorker) {
sdbConfirmForward
(
NULL
,
pWrite
,
pWrite
->
retCode
);
}
else
if
(
qtype
==
TAOS_QTYPE_FWD
)
{
pHead
=
(
SWalHead
*
)
item
;
syncConfirmForward
(
tsSdb
Obj
.
sync
,
pHead
->
version
,
pHead
->
len
);
syncConfirmForward
(
tsSdb
Mgmt
.
sync
,
pHead
->
version
,
pHead
->
len
);
taosFreeQitem
(
item
);
}
else
{
taosFreeQitem
(
item
);
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
98ec34b4
...
...
@@ -100,12 +100,12 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
}
static
int32_t
mnodeChildTableActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
mnodeDestroyChildTable
(
pWMsg
->
p
Obj
);
mnodeDestroyChildTable
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeChildTableActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SCTableObj
*
pTable
=
pWMsg
->
p
Obj
;
SCTableObj
*
pTable
=
pWMsg
->
p
Row
;
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
@@ -154,7 +154,7 @@ static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeChildTableActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SCTableObj
*
pTable
=
pWMsg
->
p
Obj
;
SCTableObj
*
pTable
=
pWMsg
->
p
Row
;
if
(
pTable
->
vgId
==
0
)
{
return
TSDB_CODE_MND_VGROUP_NOT_EXIST
;
}
...
...
@@ -190,7 +190,7 @@ static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeChildTableActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SCTableObj
*
pNew
=
pWMsg
->
p
Obj
;
SCTableObj
*
pNew
=
pWMsg
->
p
Row
;
SCTableObj
*
pTable
=
mnodeGetChildTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
...
...
@@ -217,7 +217,7 @@ static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeChildTableActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SCTableObj
*
pTable
=
pWMsg
->
p
Obj
;
SCTableObj
*
pTable
=
pWMsg
->
p
Row
;
assert
(
pTable
!=
NULL
&&
pWMsg
->
rowData
!=
NULL
);
int32_t
len
=
strlen
(
pTable
->
info
.
tableId
);
...
...
@@ -282,7 +282,7 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) {
}
}
pWMsg
->
p
Obj
=
pTable
;
pWMsg
->
p
Row
=
pTable
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() {
SDbObj
*
pDb
=
mnodeGetDbByTableId
(
pTable
->
info
.
tableId
);
if
(
pDb
==
NULL
||
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"ctable:%s, failed to get db or db in dropping, discard it"
,
pTable
->
info
.
tableId
);
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsChildTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
mnodeDecTableRef
(
pTable
);
mnodeDecDbRef
(
pDb
);
...
...
@@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() {
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, failed to get vgId:%d tid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
tid
);
pTable
->
vgId
=
0
;
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsChildTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
mnodeDecTableRef
(
pTable
);
continue
;
...
...
@@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() {
mError
(
"ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pDb
->
name
,
pTable
->
vgId
,
pVgroup
->
dbName
,
pTable
->
tid
);
pTable
->
vgId
=
0
;
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsChildTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
mnodeDecTableRef
(
pTable
);
continue
;
...
...
@@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() {
if
(
pSuperTable
==
NULL
)
{
mError
(
"ctable:%s, stable:%"
PRIu64
" not exist"
,
pTable
->
info
.
tableId
,
pTable
->
suid
);
pTable
->
vgId
=
0
;
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsChildTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
mnodeDecTableRef
(
pTable
);
continue
;
...
...
@@ -364,7 +364,7 @@ static int32_t mnodeInitChildTables() {
.
fpEncode
=
mnodeChildTableActionEncode
,
.
fpDecode
=
mnodeChildTableActionDecode
,
.
fpDestroy
=
mnodeChildTableActionDestroy
,
.
fp
D
estored
=
mnodeChildTableActionRestored
.
fp
R
estored
=
mnodeChildTableActionRestored
};
tsChildTableSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -431,12 +431,12 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) {
}
static
int32_t
mnodeSuperTableActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
mnodeDestroySuperTable
(
pWMsg
->
p
Obj
);
mnodeDestroySuperTable
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeSuperTableActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SSTableObj
*
pStable
=
pWMsg
->
p
Obj
;
SSTableObj
*
pStable
=
pWMsg
->
p
Row
;
SDbObj
*
pDb
=
mnodeGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
&&
pDb
->
status
==
TSDB_DB_STATUS_READY
)
{
mnodeAddSuperTableIntoDb
(
pDb
);
...
...
@@ -447,7 +447,7 @@ static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeSuperTableActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SSTableObj
*
pStable
=
pWMsg
->
p
Obj
;
SSTableObj
*
pStable
=
pWMsg
->
p
Row
;
SDbObj
*
pDb
=
mnodeGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
)
{
mnodeRemoveSuperTableFromDb
(
pDb
);
...
...
@@ -459,7 +459,7 @@ static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeSuperTableActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SSTableObj
*
pNew
=
pWMsg
->
p
Obj
;
SSTableObj
*
pNew
=
pWMsg
->
p
Row
;
SSTableObj
*
pTable
=
mnodeGetSuperTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
NULL
&&
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
...
...
@@ -484,8 +484,8 @@ static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeSuperTableActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SSTableObj
*
pStable
=
pWMsg
->
p
Obj
;
assert
(
pWMsg
->
p
Obj
!=
NULL
&&
pWMsg
->
rowData
!=
NULL
);
SSTableObj
*
pStable
=
pWMsg
->
p
Row
;
assert
(
pWMsg
->
p
Row
!=
NULL
&&
pWMsg
->
rowData
!=
NULL
);
int32_t
len
=
strlen
(
pStable
->
info
.
tableId
);
if
(
len
>=
TSDB_TABLE_FNAME_LEN
)
len
=
TSDB_CODE_MND_INVALID_TABLE_ID
;
...
...
@@ -531,7 +531,7 @@ static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) {
memcpy
(
pStable
->
schema
,
pWMsg
->
rowData
+
len
,
schemaSize
);
pWMsg
->
p
Obj
=
pStable
;
pWMsg
->
p
Row
=
pStable
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -557,7 +557,7 @@ static int32_t mnodeInitSuperTables() {
.
fpEncode
=
mnodeSuperTableActionEncode
,
.
fpDecode
=
mnodeSuperTableActionDecode
,
.
fpDestroy
=
mnodeSuperTableActionDestroy
,
.
fp
D
estored
=
mnodeSuperTableActionRestored
.
fp
R
estored
=
mnodeSuperTableActionRestored
};
tsSuperTableSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
}
else
{
mError
(
"app:%p:%p, stable:%s, failed to create in sdb, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
tstrerror
(
code
));
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsSuperTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsSuperTableSdb
};
sdbDeleteRow
(
&
desc
);
}
...
...
@@ -881,7 +881,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
rowSize
=
sizeof
(
SSTableObj
)
+
schemaSize
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeCreateSuperTableCb
...
...
@@ -940,7 +940,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeDropSuperTableCb
};
...
...
@@ -1013,7 +1013,7 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAddSuperTableTagCb
};
...
...
@@ -1047,7 +1047,7 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeDropSuperTableTagCb
};
...
...
@@ -1091,7 +1091,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeModifySuperTableTagNameCb
};
...
...
@@ -1165,7 +1165,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAddSuperTableColumnCb
};
...
...
@@ -1210,7 +1210,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeDropSuperTableColumnCb
};
...
...
@@ -1254,7 +1254,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pStable
,
.
p
Row
=
pStable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeChangeSuperTableColumnCb
};
...
...
@@ -1420,7 +1420,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsSuperTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
};
sdbDeleteRow
(
&
wmsg
);
numOfTables
++
;
...
...
@@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
}
else
{
mError
(
"app:%p:%p, table:%s, failed to create table sid:%d, uid:%"
PRIu64
", reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pTable
->
tid
,
pTable
->
uid
,
tstrerror
(
code
));
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pTable
,
.
pTable
=
tsChildTableSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
return
code
;
}
...
...
@@ -1782,7 +1782,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SSWriteMsg
desc
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
,
.
pMsg
=
pMsg
,
.
fpReq
=
mnodeDoCreateChildTableFp
...
...
@@ -1904,7 +1904,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeDropChildTableCb
};
...
...
@@ -2008,7 +2008,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAlterNormalTableColumnCb
};
...
...
@@ -2041,7 +2041,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAlterNormalTableColumnCb
};
...
...
@@ -2078,7 +2078,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pMsg
=
pMsg
,
.
fpWrite
=
mnodeAlterNormalTableColumnCb
};
...
...
@@ -2221,7 +2221,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
};
sdbDeleteRow
(
&
wmsg
);
numOfTables
++
;
...
...
@@ -2254,7 +2254,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
};
sdbDeleteRow
(
&
wmsg
);
numOfTables
++
;
...
...
@@ -2283,7 +2283,7 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
};
sdbDeleteRow
(
&
wmsg
);
numOfTables
++
;
...
...
@@ -2412,7 +2412,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
||
rpcMsg
->
code
==
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
{
SSWriteMsg
desc
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pTable
,
.
p
Row
=
pTable
,
.
pTable
=
tsChildTableSdb
,
.
pMsg
=
mnodeMsg
,
.
fpWrite
=
mnodeDoCreateChildTableCb
...
...
@@ -2440,7 +2440,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
tid
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
,
mnodeMsg
->
incomingTs
,
sec
,
mnodeMsg
->
retry
);
SSWriteMsg
wmsg
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Obj
=
pTable
};
SSWriteMsg
wmsg
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsChildTableSdb
,
.
p
Row
=
pTable
};
sdbDeleteRow
(
&
wmsg
);
if
(
rpcMsg
->
code
==
TSDB_CODE_APP_NOT_READY
)
{
...
...
src/mnode/src/mnodeUser.c
浏览文件 @
98ec34b4
...
...
@@ -43,12 +43,12 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg);
static
int32_t
mnodeProcessAuthMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeUserActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
tfree
(
pWMsg
->
p
Obj
);
tfree
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SUserObj
*
pUser
=
pWMsg
->
p
Obj
;
SUserObj
*
pUser
=
pWMsg
->
p
Row
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
...
...
@@ -63,7 +63,7 @@ static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeUserActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SUserObj
*
pUser
=
pWMsg
->
p
Obj
;
SUserObj
*
pUser
=
pWMsg
->
p
Row
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
...
...
@@ -75,7 +75,7 @@ static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeUserActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SUserObj
*
pUser
=
pWMsg
->
p
Obj
;
SUserObj
*
pUser
=
pWMsg
->
p
Row
;
SUserObj
*
pSaved
=
mnodeGetUser
(
pUser
->
user
);
if
(
pUser
!=
pSaved
)
{
memcpy
(
pSaved
,
pUser
,
tsUserUpdateSize
);
...
...
@@ -86,7 +86,7 @@ static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeUserActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SUserObj
*
pUser
=
pWMsg
->
p
Obj
;
SUserObj
*
pUser
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pUser
,
tsUserUpdateSize
);
pWMsg
->
rowSize
=
tsUserUpdateSize
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -97,7 +97,7 @@ static int32_t mnodeUserActionDecode(SSWriteMsg *pWMsg) {
if
(
pUser
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pUser
,
pWMsg
->
rowData
,
tsUserUpdateSize
);
pWMsg
->
p
Obj
=
pUser
;
pWMsg
->
p
Row
=
pUser
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -163,7 +163,7 @@ int32_t mnodeInitUsers() {
.
fpEncode
=
mnodeUserActionEncode
,
.
fpDecode
=
mnodeUserActionDecode
,
.
fpDestroy
=
mnodeUserActionDestroy
,
.
fp
D
estored
=
mnodeUserActionRestored
.
fp
R
estored
=
mnodeUserActionRestored
};
tsUserSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -208,7 +208,7 @@ static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsUserSdb
,
.
p
Obj
=
pUser
,
.
p
Row
=
pUser
,
.
pMsg
=
pMsg
};
...
...
@@ -262,7 +262,7 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsUserSdb
,
.
p
Obj
=
pUser
,
.
p
Row
=
pUser
,
.
rowSize
=
sizeof
(
SUserObj
),
.
pMsg
=
pMsg
};
...
...
@@ -282,7 +282,7 @@ static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsUserSdb
,
.
p
Obj
=
pUser
,
.
p
Row
=
pUser
,
.
pMsg
=
pMsg
};
...
...
@@ -565,7 +565,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsUserSdb
,
.
p
Obj
=
pUser
,
.
p
Row
=
pUser
,
};
sdbDeleteRow
(
&
wmsg
);
numOfUsers
++
;
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
98ec34b4
...
...
@@ -73,12 +73,12 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
}
static
int32_t
mnodeVgroupActionDestroy
(
SSWriteMsg
*
pWMsg
)
{
mnodeDestroyVgroup
(
pWMsg
->
p
Obj
);
mnodeDestroyVgroup
(
pWMsg
->
p
Row
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionInsert
(
SSWriteMsg
*
pWMsg
)
{
SVgObj
*
pVgroup
=
pWMsg
->
p
Obj
;
SVgObj
*
pVgroup
=
pWMsg
->
p
Row
;
// refer to db
SDbObj
*
pDb
=
mnodeGetDb
(
pVgroup
->
dbName
);
...
...
@@ -116,7 +116,7 @@ static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeVgroupActionDelete
(
SSWriteMsg
*
pWMsg
)
{
SVgObj
*
pVgroup
=
pWMsg
->
p
Obj
;
SVgObj
*
pVgroup
=
pWMsg
->
p
Row
;
if
(
pVgroup
->
pDb
==
NULL
)
{
mError
(
"vgId:%d, db:%s is not exist while insert into hash"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
...
...
@@ -138,7 +138,7 @@ static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeVgroupActionUpdate
(
SSWriteMsg
*
pWMsg
)
{
SVgObj
*
pNew
=
pWMsg
->
p
Obj
;
SVgObj
*
pNew
=
pWMsg
->
p
Row
;
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pNew
->
vgId
);
if
(
pVgroup
!=
pNew
)
{
...
...
@@ -177,7 +177,7 @@ static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) {
}
static
int32_t
mnodeVgroupActionEncode
(
SSWriteMsg
*
pWMsg
)
{
SVgObj
*
pVgroup
=
pWMsg
->
p
Obj
;
SVgObj
*
pVgroup
=
pWMsg
->
p
Row
;
memcpy
(
pWMsg
->
rowData
,
pVgroup
,
tsVgUpdateSize
);
SVgObj
*
pTmpVgroup
=
pWMsg
->
rowData
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
...
...
@@ -194,7 +194,7 @@ static int32_t mnodeVgroupActionDecode(SSWriteMsg *pWMsg) {
if
(
pVgroup
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pVgroup
,
pWMsg
->
rowData
,
tsVgUpdateSize
);
pWMsg
->
p
Obj
=
pVgroup
;
pWMsg
->
p
Row
=
pVgroup
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -219,7 +219,7 @@ int32_t mnodeInitVgroups() {
.
fpEncode
=
mnodeVgroupActionEncode
,
.
fpDecode
=
mnodeVgroupActionDecode
,
.
fpDestroy
=
mnodeVgroupActionDestroy
,
.
fp
D
estored
=
mnodeVgroupActionRestored
,
.
fp
R
estored
=
mnodeVgroupActionRestored
,
};
tsVgroupSdb
=
sdbOpenTable
(
&
tableDesc
);
...
...
@@ -256,7 +256,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
.
p
Row
=
pVgroup
};
int32_t
code
=
sdbUpdateRow
(
&
wmsg
);
...
...
@@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"app:%p:%p, vgId:%d, failed to create in sdb, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
tstrerror
(
code
));
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pVgroup
,
.
pTable
=
tsVgroupSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Row
=
pVgroup
,
.
pTable
=
tsVgroupSdb
};
sdbDeleteRow
(
&
desc
);
return
code
;
}
else
{
mInfo
(
"app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
pVgroup
->
status
=
TAOS_VG_STATUS_READY
;
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Obj
=
pVgroup
,
.
pTable
=
tsVgroupSdb
};
SSWriteMsg
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
p
Row
=
pVgroup
,
.
pTable
=
tsVgroupSdb
};
(
void
)
sdbUpdateRow
(
&
desc
);
dnodeReprocessMWriteMsg
(
pMsg
);
...
...
@@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
// pDb->name, pVgroup->numOfVnodes);
// pVgroup->status = TAOS_VG_STATUS_READY;
// SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .p
Obj
= pVgroup, .pTable = tsVgroupSdb};
// SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .p
Row
= pVgroup, .pTable = tsVgroupSdb};
// (void)sdbUpdateRow(&desc);
// dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
...
...
@@ -574,7 +574,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
,
.
p
Row
=
pVgroup
,
.
rowSize
=
sizeof
(
SVgObj
),
.
pMsg
=
pMsg
,
.
fpReq
=
mnodeCreateVgroupFp
...
...
@@ -598,7 +598,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
.
p
Row
=
pVgroup
};
sdbDeleteRow
(
&
wmsg
);
}
...
...
@@ -770,7 +770,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
SDnodeObj
*
pDnode
=
pVgroup
->
vnodeGid
[
i
].
pDnode
;
const
char
*
role
=
"NULL"
;
if
(
pDnode
!=
NULL
)
{
role
=
mnodeGetMnodeRoleStr
(
pVgroup
->
vnodeGid
[
i
].
role
)
;
role
=
syncRole
[
pVgroup
->
vnodeGid
[
i
].
role
]
;
}
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -960,7 +960,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
,
.
p
Row
=
pVgroup
,
.
rowSize
=
sizeof
(
SVgObj
),
.
pMsg
=
mnodeMsg
,
.
fpWrite
=
mnodeCreateVgroupCb
...
...
@@ -976,7 +976,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
.
p
Row
=
pVgroup
};
sdbDeleteRow
(
&
wmsg
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
...
...
@@ -1034,7 +1034,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
.
p
Row
=
pVgroup
};
int32_t
code
=
sdbDeleteRow
(
&
wmsg
);
if
(
code
!=
0
)
{
...
...
@@ -1087,7 +1087,7 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
,
.
p
Row
=
pVgroup
,
};
sdbDeleteRow
(
&
wmsg
);
numOfVgroups
++
;
...
...
@@ -1138,7 +1138,7 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pTable
=
tsVgroupSdb
,
.
p
Obj
=
pVgroup
,
.
p
Row
=
pVgroup
,
};
sdbDeleteRow
(
&
wmsg
);
numOfVgroups
++
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录