Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6a47a011
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6a47a011
编写于
4月 18, 2020
作者:
S
slguan
提交者:
GitHub
4月 18, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1655 from taosdata/feature/mpeer
[TD-52] refactor sdb write codes
上级
c65b3c5c
a09dac94
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
471 addition
and
353 deletion
+471
-353
src/dnode/CMakeLists.txt
src/dnode/CMakeLists.txt
+1
-1
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+4
-3
src/inc/mnode.h
src/inc/mnode.h
+1
-0
src/inc/tbalance.h
src/inc/tbalance.h
+8
-9
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+2
-3
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+21
-28
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+8
-8
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+6
-7
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+1
-1
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+11
-11
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+13
-13
src/mnode/src/mgmtMain.c
src/mnode/src/mgmtMain.c
+7
-7
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+12
-16
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+312
-185
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+1
-1
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+34
-34
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+11
-11
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+15
-15
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+3
-0
未找到文件。
src/dnode/CMakeLists.txt
浏览文件 @
6a47a011
...
...
@@ -27,7 +27,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ENDIF
()
IF
(
TD_SYNC
)
TARGET_LINK_LIBRARIES
(
taosd
replica
sync
)
TARGET_LINK_LIBRARIES
(
taosd
balance
sync
)
ENDIF
()
SET
(
PREPARE_ENV_CMD
"prepare_env_cmd"
)
...
...
src/dnode/src/dnodeMClient.c
浏览文件 @
6a47a011
...
...
@@ -23,12 +23,13 @@
#include "tsync.h"
#include "ttime.h"
#include "ttimer.h"
#include "treplica.h"
#include "tbalance.h"
#include "vnode.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeMClient.h"
#include "dnodeModule.h"
#include "dnodeMgmt.h"
#include "vnode.h"
#define MPEER_CONTENT_LEN 2000
...
...
@@ -181,7 +182,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
tsMnodeInfos
.
nodeInfos
[
i
].
nodeName
);
}
dnodeSaveMnodeIpList
();
replicaNotify
();
sdbUpdateSync
();
}
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
...
...
src/inc/mnode.h
浏览文件 @
6a47a011
...
...
@@ -24,6 +24,7 @@ int32_t mgmtInitSystem();
int32_t
mgmtStartSystem
();
void
mgmtCleanUpSystem
();
void
mgmtStopSystem
();
void
sdbUpdateSync
();
#ifdef __cplusplus
}
...
...
src/inc/t
replica
.h
→
src/inc/t
balance
.h
浏览文件 @
6a47a011
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
REPLICA
_H
#define TDENGINE_
REPLICA
_H
#ifndef TDENGINE_
BALANCE
_H
#define TDENGINE_
BALANCE
_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -23,13 +23,12 @@ extern "C" {
struct
SVgObj
;
struct
SDnodeObj
;
int32_t
replicaInit
();
void
replicaCleanUp
();
void
replicaNotify
();
void
replicaReset
();
int32_t
replicaAllocVnodes
(
struct
SVgObj
*
pVgroup
);
int32_t
replicaForwardReqToPeer
(
void
*
pHead
);
int32_t
replicaDropDnode
(
struct
SDnodeObj
*
pDnode
);
int32_t
balanceInit
();
void
balanceCleanUp
();
void
balanceNotify
();
void
balanceReset
();
int32_t
balanceAllocVnodes
(
struct
SVgObj
*
pVgroup
);
int32_t
balanceDropDnode
(
struct
SDnodeObj
*
pDnode
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtMnode.h
浏览文件 @
6a47a011
...
...
@@ -39,10 +39,9 @@ int32_t mgmtGetMnodesNum();
void
*
mgmtGetNextMnode
(
void
*
pNode
,
struct
SMnodeObj
**
pMnode
);
void
mgmtReleaseMnode
(
struct
SMnodeObj
*
pMnode
);
bool
mgmtIsMaster
();
char
*
mgmtGetMnodeRoleStr
();
void
mgmtGetMnodeIpList
(
SRpcIpSet
*
ipSet
,
bool
usePublicIp
);
void
mgmtGetMnodeList
(
void
*
m
peer
s
);
void
mgmtGetMnodeList
(
void
*
m
node
s
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtSdb.h
浏览文件 @
6a47a011
...
...
@@ -36,54 +36,47 @@ typedef enum {
SDB_KEY_STRING
,
SDB_KEY_INT
,
SDB_KEY_AUTO
}
ESdbKey
Type
;
}
ESdbKey
;
typedef
enum
{
SDB_OPER_GLOBAL
,
SDB_OPER_LOCAL
}
ESdbOper
Type
;
}
ESdbOper
;
typedef
struct
{
ESdbOper
Type
type
;
void
*
table
;
void
*
pObj
;
int32_t
rowSize
;
void
*
rowData
;
}
SSdbOper
Desc
;
ESdbOper
type
;
void
*
table
;
void
*
pObj
;
int32_t
rowSize
;
void
*
rowData
;
}
SSdbOper
;
typedef
struct
{
char
*
tableName
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
ESdbTable
tableId
;
ESdbKey
Type
keyType
;
int32_t
(
*
insertFp
)(
SSdbOper
Desc
*
pOper
);
int32_t
(
*
deleteFp
)(
SSdbOper
Desc
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOper
Desc
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOper
Desc
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOper
Desc
*
pDesc
);
int32_t
(
*
destroyFp
)(
SSdbOper
Desc
*
pDesc
);
ESdbTable
tableId
;
ESdbKey
keyType
;
int32_t
(
*
insertFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
deleteFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOper
*
pDesc
);
int32_t
(
*
destroyFp
)(
SSdbOper
*
pDesc
);
int32_t
(
*
restoredFp
)();
}
SSdbTableDesc
;
typedef
struct
{
int64_t
version
;
void
*
wal
;
pthread_mutex_t
mutex
;
}
SSdbObject
;
int32_t
sdbInit
();
void
sdbCleanUp
();
SSdbObject
*
sdbGetObj
();
void
*
sdbOpenTable
(
SSdbTableDesc
*
desc
);
void
sdbCloseTable
(
void
*
handle
);
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
);
bool
sdbIsMaster
();
void
sdbUpdateMnodeRoles
();
int32_t
sdbInsertRow
(
SSdbOper
Desc
*
pOper
);
int32_t
sdbDeleteRow
(
SSdbOper
Desc
*
pOper
);
int32_t
sdbUpdateRow
(
SSdbOper
Desc
*
pOper
);
int32_t
sdbInsertRow
(
SSdbOper
*
pOper
);
int32_t
sdbDeleteRow
(
SSdbOper
*
pOper
);
int32_t
sdbUpdateRow
(
SSdbOper
*
pOper
);
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
6a47a011
...
...
@@ -30,28 +30,28 @@ void * tsAcctSdb = NULL;
int32_t
tsAcctUpdateSize
;
static
void
mgmtCreateRootAcct
();
static
int32_t
mgmtActionAcctDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtActionAcctDestroy
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
pthread_mutex_destroy
(
&
pAcct
->
mutex
);
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtAcctActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtAcctActionInsert
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
memset
(
&
pAcct
->
acctInfo
,
0
,
sizeof
(
SAcctInfo
));
pthread_mutex_init
(
&
pAcct
->
mutex
,
NULL
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtActionAcctDelete
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
mgmtDropAllUsers
(
pAcct
);
mgmtDropAllDbs
(
pAcct
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtActionAcctUpdate
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
SAcctObj
*
pSaved
=
mgmtGetAcct
(
pAcct
->
user
);
if
(
pAcct
!=
pSaved
)
{
...
...
@@ -61,14 +61,14 @@ static int32_t mgmtActionAcctUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtActionActionEncode
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pAcct
,
tsAcctUpdateSize
);
pOper
->
rowSize
=
tsAcctUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtActionAcctDecode
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
(
SAcctObj
*
)
calloc
(
1
,
sizeof
(
SAcctObj
));
if
(
pAcct
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -110,7 +110,7 @@ int32_t mgmtInitAccts() {
return
-
1
;
}
mTrace
(
"
account
table is created"
);
mTrace
(
"
table:accounts
table is created"
);
return
acctInit
();
}
...
...
@@ -179,7 +179,7 @@ static void mgmtCreateRootAcct() {
pAcct
->
acctId
=
sdbGetId
(
tsAcctSdb
);
pAcct
->
createdTime
=
taosGetTimestampMs
();
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsAcctSdb
,
.
pObj
=
pAcct
,
...
...
src/mnode/src/mgmt
Replica
.c
→
src/mnode/src/mgmt
Balance
.c
浏览文件 @
6a47a011
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "trpc.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
...
...
@@ -25,13 +25,12 @@
#ifndef _SYNC
int32_t
replicaInit
()
{
return
TSDB_CODE_SUCCESS
;
}
void
replicaCleanUp
()
{}
void
replicaNotify
()
{}
void
replicaReset
()
{}
int32_t
replicaForwardReqToPeer
(
void
*
pHead
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
balanceInit
()
{
return
TSDB_CODE_SUCCESS
;
}
void
balanceCleanUp
()
{}
void
balanceNotify
()
{}
void
balanceReset
()
{}
int32_t
replica
AllocVnodes
(
SVgObj
*
pVgroup
)
{
int32_t
balance
AllocVnodes
(
SVgObj
*
pVgroup
)
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
SDnodeObj
*
pSelDnode
=
NULL
;
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
6a47a011
...
...
@@ -21,7 +21,7 @@
#include "tsystem.h"
#include "tutil.h"
#include "tgrant.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "tglobalcfg.h"
#include "dnode.h"
#include "mgmtDef.h"
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
6a47a011
...
...
@@ -46,12 +46,12 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
static
void
mgmtProcessAlterDbMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessDropDbMsg
(
SQueuedMsg
*
pMsg
);
static
int32_t
mgmtDbActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionDestroy
(
SSdbOper
*
pOper
)
{
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionInsert
(
SSdbOper
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
...
...
@@ -72,7 +72,7 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionDelete
(
SSdbOper
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
...
...
@@ -84,7 +84,7 @@ static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionUpdate
(
SSdbOper
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
SDbObj
*
pSaved
=
mgmtGetDb
(
pDb
->
name
);
if
(
pDb
!=
pSaved
)
{
...
...
@@ -94,14 +94,14 @@ static int32_t mgmtDbActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionEncode
(
SSdbOper
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pDb
,
tsDbUpdateSize
);
pOper
->
rowSize
=
tsDbUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDbActionDecode
(
SSdbOper
*
pOper
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
calloc
(
1
,
sizeof
(
SDbObj
));
if
(
pDb
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -146,7 +146,7 @@ int32_t mgmtInitDbs() {
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DB
,
mgmtGetDbMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DB
,
mgmtRetrieveDbs
);
mTrace
(
"
db data is initializ
ed"
);
mTrace
(
"
table:dbs table is creat
ed"
);
return
0
;
}
...
...
@@ -318,7 +318,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
pDb
->
createdTime
=
taosGetTimestampMs
();
pDb
->
cfg
=
*
pCreate
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
...
...
@@ -671,7 +671,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
if
(
pDb
->
status
)
return
TSDB_CODE_SUCCESS
;
pDb
->
status
=
true
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
...
...
@@ -756,7 +756,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
if
(
memcmp
(
&
newCfg
,
&
pDb
->
cfg
,
sizeof
(
SDbCfg
))
!=
0
)
{
pDb
->
cfg
=
newCfg
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
...
...
@@ -814,7 +814,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) {
SDbObj
*
pDb
=
pMsg
->
pDb
;
mPrint
(
"db:%s, drop db from sdb"
,
pDb
->
name
);
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
6a47a011
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "tglobalcfg.h"
#include "ttime.h"
#include "tutil.h"
...
...
@@ -52,12 +52,12 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtDnodeActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionDestroy
(
SSdbOper
*
pOper
)
{
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionInsert
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
if
(
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
)
{
pDnode
->
status
=
TAOS_DN_STATUS_OFFLINE
;
...
...
@@ -72,7 +72,7 @@ static int32_t mgmtDnodeActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionDelete
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
...
...
@@ -85,7 +85,7 @@ static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) {
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
vnodeGid
[
0
].
dnodeId
==
pDnode
->
dnodeId
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
...
...
@@ -101,7 +101,7 @@ static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionUpdate
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
SDnodeObj
*
pSaved
=
mgmtGetDnode
(
pDnode
->
dnodeId
);
if
(
pDnode
!=
pSaved
)
{
...
...
@@ -111,14 +111,14 @@ static int32_t mgmtDnodeActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionEncode
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pDnode
,
tsDnodeUpdateSize
);
pOper
->
rowSize
=
tsDnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtDnodeActionDecode
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
(
SDnodeObj
*
)
calloc
(
1
,
sizeof
(
SDnodeObj
));
if
(
pDnode
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -180,7 +180,7 @@ int32_t mgmtInitDnodes() {
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtRetrieveDnodes
);
mTrace
(
"dnodes table is created"
);
mTrace
(
"
table:
dnodes table is created"
);
return
0
;
}
...
...
@@ -221,7 +221,7 @@ void mgmtReleaseDnode(SDnodeObj *pDnode) {
}
void
mgmtUpdateDnode
(
SDnodeObj
*
pDnode
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
...
...
@@ -340,7 +340,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if
(
pDnode
->
status
==
TAOS_DN_STATUS_OFFLINE
)
{
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TAOS_DN_STATUS_READY
;
replica
Notify
();
balance
Notify
();
}
mgmtReleaseDnode
(
pDnode
);
...
...
@@ -393,7 +393,7 @@ static int32_t mgmtCreateDnode(uint32_t ip) {
pDnode
->
totalVnodes
=
TSDB_INVALID_VNODE_NUM
;
sprintf
(
pDnode
->
dnodeName
,
"n%d"
,
sdbGetId
(
tsDnodeSdb
)
+
1
);
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
...
...
@@ -413,7 +413,7 @@ static int32_t mgmtCreateDnode(uint32_t ip) {
}
int32_t
mgmtDropDnode
(
SDnodeObj
*
pDnode
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
...
...
src/mnode/src/mgmtMain.c
浏览文件 @
6a47a011
...
...
@@ -17,7 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "tgrant.h"
#include "ttimer.h"
#include "dnode.h"
...
...
@@ -62,7 +62,7 @@ int32_t mgmtStartSystem() {
}
if
(
grantInit
()
<
0
)
{
mError
(
"failed to init grant
s
"
);
mError
(
"failed to init grant"
);
return
-
1
;
}
...
...
@@ -92,7 +92,7 @@ int32_t mgmtStartSystem() {
}
if
(
mgmtInitMnodes
()
<
0
)
{
mError
(
"failed to init m
peer
s"
);
mError
(
"failed to init m
node
s"
);
return
-
1
;
}
...
...
@@ -101,8 +101,8 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
replica
Init
()
<
0
)
{
mError
(
"failed to init
replica
"
)
if
(
balance
Init
()
<
0
)
{
mError
(
"failed to init
balance
"
)
}
if
(
mgmtInitDClient
()
<
0
)
{
...
...
@@ -144,7 +144,7 @@ void mgmtCleanUpSystem() {
mPrint
(
"starting to clean up mgmt"
);
grantCleanUp
();
mgmtCleanupMnodes
();
replica
CleanUp
();
balance
CleanUp
();
mgmtCleanUpShell
();
mgmtCleanupDClient
();
mgmtCleanupDServer
();
...
...
@@ -161,7 +161,7 @@ void mgmtCleanUpSystem() {
}
void
mgmtStopSystem
()
{
if
(
mgmt
IsMaster
())
{
if
(
sdb
IsMaster
())
{
mTrace
(
"it is a master mgmt node, it could not be stopped"
);
return
;
}
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
6a47a011
...
...
@@ -18,7 +18,7 @@
#include "taoserror.h"
#include "trpc.h"
#include "tsync.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "tutil.h"
#include "ttime.h"
#include "tsocket.h"
...
...
@@ -30,18 +30,17 @@
#include "mgmtShell.h"
#include "mgmtUser.h"
int32_t
tsMnodeIsMaster
=
true
;
static
void
*
tsMnodeSdb
=
NULL
;
static
int32_t
tsMnodeUpdateSize
=
0
;
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtMnodeActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionDestroy
(
SSdbOper
*
pOper
)
{
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtMnodeActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionInsert
(
SSdbOper
*
pOper
)
{
SMnodeObj
*
pMnode
=
pOper
->
pObj
;
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_DNODE_NOT_EXIST
;
...
...
@@ -53,7 +52,7 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtMnodeActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionDelete
(
SSdbOper
*
pOper
)
{
SMnodeObj
*
pMnode
=
pOper
->
pObj
;
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pMnode
->
mnodeId
);
...
...
@@ -65,7 +64,7 @@ static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtMnodeActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionUpdate
(
SSdbOper
*
pOper
)
{
SMnodeObj
*
pMnode
=
pOper
->
pObj
;
SMnodeObj
*
pSaved
=
mgmtGetMnode
(
pMnode
->
mnodeId
);
if
(
pMnode
!=
pSaved
)
{
...
...
@@ -76,14 +75,14 @@ static int32_t mgmtMnodeActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtMnodeActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionEncode
(
SSdbOper
*
pOper
)
{
SMnodeObj
*
pMnode
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pMnode
,
tsMnodeUpdateSize
);
pOper
->
rowSize
=
tsMnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtMnodeActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtMnodeActionDecode
(
SSdbOper
*
pOper
)
{
SMnodeObj
*
pMnode
=
calloc
(
1
,
sizeof
(
SMnodeObj
));
if
(
pMnode
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -133,7 +132,7 @@ int32_t mgmtInitMnodes() {
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtGetMnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtRetrieveMnodes
);
mTrace
(
"mnodes table is created"
);
mTrace
(
"
table:
mnodes table is created"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -157,7 +156,7 @@ void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
return
sdbFetchRow
(
tsMnodeSdb
,
pNode
,
(
void
**
)
pMnode
);
}
static
char
*
mgmtGetMnodeRoleStr
(
int32_t
role
)
{
char
*
mgmtGetMnodeRoleStr
(
int32_t
role
)
{
switch
(
role
)
{
case
TAOS_SYNC_ROLE_OFFLINE
:
return
"offline"
;
...
...
@@ -172,8 +171,6 @@ static char *mgmtGetMnodeRoleStr(int32_t role) {
}
}
bool
mgmtIsMaster
()
{
return
tsMnodeIsMaster
;
}
void
mgmtGetMnodeIpList
(
SRpcIpSet
*
ipSet
,
bool
usePublicIp
)
{
void
*
pNode
=
NULL
;
while
(
1
)
{
...
...
@@ -213,10 +210,8 @@ void mgmtGetMnodeList(void *param) {
mnodes
->
nodeInfos
[
index
].
nodeIp
=
htonl
(
pMnode
->
pDnode
->
privateIp
);
mnodes
->
nodeInfos
[
index
].
nodePort
=
htons
(
pMnode
->
pDnode
->
mnodeDnodePort
);
strcpy
(
mnodes
->
nodeInfos
[
index
].
nodeName
,
pMnode
->
pDnode
->
dnodeName
);
mPrint
(
"node:%d role:%s"
,
pMnode
->
mnodeId
,
mgmtGetMnodeRoleStr
(
pMnode
->
role
));
if
(
pMnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
{
mnodes
->
inUse
=
index
;
mPrint
(
"node:%d inUse:%d"
,
pMnode
->
mnodeId
,
mnodes
->
inUse
);
}
index
++
;
...
...
@@ -231,7 +226,7 @@ int32_t mgmtAddMnode(int32_t dnodeId) {
pMnode
->
mnodeId
=
dnodeId
;
pMnode
->
createdTime
=
taosGetTimestampMs
();
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsMnodeSdb
,
.
pObj
=
pMnode
,
...
...
@@ -252,7 +247,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) {
return
TSDB_CODE_DNODE_NOT_EXIST
;
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsMnodeSdb
,
.
pObj
=
pMnode
...
...
@@ -268,6 +263,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) {
}
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
sdbUpdateMnodeRoles
();
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
6a47a011
...
...
@@ -18,65 +18,93 @@
#include "taoserror.h"
#include "tlog.h"
#include "trpc.h"
#include "treplica.h"
#include "tutil.h"
#include "tbalance.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "hashint.h"
#include "hashstr.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
typedef
enum
{
SDB_ACTION_INSERT
,
SDB_ACTION_DELETE
,
SDB_ACTION_UPDATE
}
ESdbAction
;
typedef
enum
{
SDB_STATUS_OFFLINE
,
SDB_STATUS_SERVING
,
SDB_ACTION_CLOSING
}
ESdbStatus
;
typedef
struct
_SSdbTable
{
char
tableName
[
TSDB_DB_NAME_LEN
+
1
];
ESdbTable
tableId
;
ESdbKey
Type
keyType
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
int32_t
autoIndex
;
int64_t
numOfRows
;
void
*
iHandle
;
int32_t
(
*
insertFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
deleteFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
restoredFp
)();
char
tableName
[
TSDB_DB_NAME_LEN
+
1
];
ESdbTable
tableId
;
ESdbKey
keyType
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
int32_t
autoIndex
;
int64_t
numOfRows
;
void
*
iHandle
;
int32_t
(
*
insertFp
)(
SSdbOper
*
pDesc
);
int32_t
(
*
deleteFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
destroyFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
restoredFp
)();
pthread_mutex_t
mutex
;
}
SSdbTable
;
typedef
struct
{
ESyncRole
role
;
ESdbStatus
status
;
int64_t
version
;
void
*
sync
;
void
*
wal
;
SSyncCfg
cfg
;
sem_t
sem
;
int32_t
code
;
int32_t
numOfTables
;
SSdbTable
*
tableList
[
SDB_TABLE_MAX
];
pthread_mutex_t
mutex
;
}
SSdbObject
;
typedef
struct
{
int32_t
rowSize
;
void
*
row
;
}
SRowMeta
;
typedef
enum
{
SDB_ACTION_INSERT
,
SDB_ACTION_DELETE
,
SDB_ACTION_UPDATE
}
ESdbActionType
;
static
SSdbTable
*
tsSdbTableList
[
SDB_TABLE_MAX
]
=
{
0
};
static
int32_t
tsSdbNumOfTables
=
0
;
static
SSdbObject
*
tsSdbObj
;
}
SSdbRow
;
static
SSdbObject
tsSdbObj
=
{
0
};
static
void
*
(
*
sdbInitIndexFp
[])(
int32_t
maxRows
,
int32_t
dataSize
)
=
{
sdbOpenStrHash
,
sdbOpenIntHash
,
sdbOpenIntHash
};
static
void
*
(
*
sdbAddIndexFp
[])(
void
*
handle
,
void
*
key
,
void
*
data
)
=
{
sdbAddStrHash
,
sdbAddIntHash
,
sdbAddIntHash
};
static
void
(
*
sdbDeleteIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbDeleteStrHash
,
sdbDeleteIntHash
,
sdbDeleteIntHash
};
static
void
*
(
*
sdbGetIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbGetStrHashData
,
sdbGetIntHashData
,
sdbGetIntHashData
};
static
void
(
*
sdbCleanUpIndexFp
[])(
void
*
handle
)
=
{
sdbCloseStrHash
,
sdbCloseIntHash
,
sdbCloseIntHash
};
static
void
*
(
*
sdbFetchRowFp
[])(
void
*
handle
,
void
*
ptr
,
void
**
ppRow
)
=
{
sdbFetchStrHashData
,
sdbFetchIntHashData
,
sdbFetchIntHashData
};
static
int
sdbWrite
(
void
*
param
,
void
*
data
,
int
type
);
int32_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
autoIndex
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
int32_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
autoIndex
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
uint64_t
sdbGetVersion
()
{
if
(
tsSdbObj
)
return
tsSdbObj
->
version
;
else
return
0
;
return
tsSdbObj
.
version
;
}
bool
sdbIsMaster
()
{
return
tsSdbObj
.
role
==
TAOS_SYNC_ROLE_MASTER
;
}
static
char
*
sdbGetActionStr
(
int32_t
action
)
{
...
...
@@ -106,26 +134,26 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) {
}
static
void
*
sdbGetTableFromId
(
int32_t
tableId
)
{
return
tsSdb
T
ableList
[
tableId
];
return
tsSdb
Obj
.
t
ableList
[
tableId
];
}
int32_t
sdbInit
()
{
tsSdbObj
=
calloc
(
1
,
sizeof
(
SSdbObject
));
pthread_mutex_init
(
&
tsSdbObj
->
mutex
,
NULL
);
static
int32_t
sdbInitWal
()
{
SWalCfg
walCfg
=
{.
commitLog
=
2
,
.
wals
=
2
,
.
keep
=
1
};
tsSdbObj
->
wal
=
walOpen
(
tsMnodeDir
,
&
walCfg
);
if
(
tsSdbObj
->
wal
==
NULL
)
{
sdbError
(
"failed to open sdb in %s"
,
tsMnodeDir
);
tsSdbObj
.
wal
=
walOpen
(
tsMnodeDir
,
&
walCfg
);
if
(
tsSdbObj
.
wal
==
NULL
)
{
sdbError
(
"failed to open sdb
wal
in %s"
,
tsMnodeDir
);
return
-
1
;
}
sdbTrace
(
"open sdb file for read"
);
walRestore
(
tsSdbObj
->
wal
,
tsSdbObj
,
sdbProcessWrite
);
sdbTrace
(
"open sdb wal for restore"
);
walRestore
(
tsSdbObj
.
wal
,
&
tsSdbObj
,
sdbWrite
);
return
0
;
}
static
void
sdbRestoreTables
()
{
int32_t
totalRows
=
0
;
int32_t
numOfTables
=
0
;
for
(
int32_t
tableId
=
SDB_TABLE_DNODE
;
tableId
<
SDB_TABLE_MAX
;
++
tableId
)
{
for
(
int32_t
tableId
=
0
;
tableId
<
SDB_TABLE_MAX
;
++
tableId
)
{
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
if
(
pTable
==
NULL
)
continue
;
if
(
pTable
->
restoredFp
)
{
...
...
@@ -134,23 +162,170 @@ int32_t sdbInit() {
totalRows
+=
pTable
->
numOfRows
;
numOfTables
++
;
sdbTrace
(
"table:%s, is initialized, numOfRows:%d"
,
pTable
->
tableName
,
pTable
->
numOfRows
);
sdbTrace
(
"table:%s, is restored, numOfRows:%d"
,
pTable
->
tableName
,
pTable
->
numOfRows
);
}
sdbTrace
(
"sdb is restored, version:%d totalRows:%d numOfTables:%d"
,
tsSdbObj
.
version
,
totalRows
,
numOfTables
);
}
void
sdbUpdateMnodeRoles
()
{
if
(
tsSdbObj
.
sync
==
NULL
)
return
;
SNodesRole
roles
=
{
0
};
syncGetNodesRole
(
tsSdbObj
.
sync
,
&
roles
);
mPrint
(
"update mnodes:%d sync roles"
,
tsSdbObj
.
cfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
tsSdbObj
.
cfg
.
replica
;
++
i
)
{
SMnodeObj
*
pMnode
=
mgmtGetMnode
(
roles
.
nodeId
[
i
]);
if
(
pMnode
!=
NULL
)
{
pMnode
->
role
=
roles
.
role
[
i
];
mPrint
(
"mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
mgmtGetMnodeRoleStr
(
pMnode
->
role
));
mgmtReleaseMnode
(
pMnode
);
}
}
}
static
uint32_t
sdbGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
int32_t
*
size
)
{
sdbUpdateMnodeRoles
();
return
0
;
}
static
int
sdbGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
)
{
strcpy
(
name
,
"wal0"
);
return
0
;
}
static
void
sdbNotifyRole
(
void
*
ahandle
,
int8_t
role
)
{
mPrint
(
"mnode role changed from %s to %s"
,
mgmtGetMnodeRoleStr
(
tsSdbObj
.
role
),
mgmtGetMnodeRoleStr
(
role
));
if
(
role
==
TAOS_SYNC_ROLE_MASTER
&&
tsSdbObj
.
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
balanceReset
();
}
tsSdbObj
.
role
=
role
;
sdbUpdateMnodeRoles
();
}
static
void
sdbConfirmForward
(
void
*
ahandle
,
void
*
param
,
int32_t
code
)
{
tsSdbObj
.
code
=
code
;
sem_post
(
&
tsSdbObj
.
sem
);
mPrint
(
"sdb forward request confirmed, result:%s"
,
tstrerror
(
code
));
}
static
int32_t
sdbForwardToPeer
(
void
*
pHead
)
{
if
(
tsSdbObj
.
sync
==
NULL
)
return
TSDB_CODE_SUCCESS
;
int32_t
code
=
syncForwardToPeer
(
tsSdbObj
.
sync
,
pHead
,
NULL
);
if
(
code
>
0
)
{
sem_wait
(
&
tsSdbObj
.
sem
);
return
tsSdbObj
.
code
;
}
return
code
;
}
void
sdbUpdateSync
()
{
SSyncCfg
syncCfg
=
{
0
};
int32_t
index
=
0
;
SDMNodeInfos
*
mnodes
=
dnodeGetMnodeList
();
for
(
int32_t
i
=
0
;
i
<
mnodes
->
nodeNum
;
++
i
)
{
SDMNodeInfo
*
node
=
&
mnodes
->
nodeInfos
[
i
];
syncCfg
.
nodeInfo
[
i
].
nodeId
=
node
->
nodeId
;
syncCfg
.
nodeInfo
[
i
].
nodeIp
=
node
->
nodeIp
;
strcpy
(
syncCfg
.
nodeInfo
[
i
].
name
,
node
->
nodeName
);
index
++
;
}
if
(
index
==
0
)
{
void
*
pNode
=
NULL
;
while
(
1
)
{
SMnodeObj
*
pMnode
=
NULL
;
pNode
=
mgmtGetNextMnode
(
pNode
,
&
pMnode
);
if
(
pMnode
==
NULL
)
break
;
syncCfg
.
nodeInfo
[
index
].
nodeId
=
pMnode
->
mnodeId
;
syncCfg
.
nodeInfo
[
index
].
nodeIp
=
pMnode
->
pDnode
->
privateIp
;
strcpy
(
syncCfg
.
nodeInfo
[
index
].
name
,
pMnode
->
pDnode
->
dnodeName
);
index
++
;
mgmtReleaseMnode
(
pMnode
);
}
}
syncCfg
.
replica
=
index
;
syncCfg
.
arbitratorIp
=
syncCfg
.
nodeInfo
[
0
].
nodeIp
;
if
(
syncCfg
.
replica
==
1
)
{
syncCfg
.
quorum
=
1
;
}
else
{
syncCfg
.
quorum
=
2
;
}
bool
hasThisDnode
=
false
;
for
(
int32_t
i
=
0
;
i
<
syncCfg
.
replica
;
++
i
)
{
if
(
syncCfg
.
nodeInfo
[
i
].
nodeId
==
dnodeGetDnodeId
())
{
hasThisDnode
=
true
;
break
;
}
}
if
(
!
hasThisDnode
)
return
;
if
(
memcmp
(
&
syncCfg
,
&
tsSdbObj
.
cfg
,
sizeof
(
SSyncCfg
))
==
0
)
return
;
mPrint
(
"work as mnode, replica:%d arbitratorIp:%s"
,
syncCfg
.
replica
,
taosIpStr
(
syncCfg
.
arbitratorIp
));
for
(
int32_t
i
=
0
;
i
<
syncCfg
.
replica
;
++
i
)
{
mPrint
(
"mnode:%d, ip:%s name:%s"
,
syncCfg
.
nodeInfo
[
i
].
nodeId
,
taosIpStr
(
syncCfg
.
nodeInfo
[
i
].
nodeIp
),
syncCfg
.
nodeInfo
[
i
].
name
);
}
sdbTrace
(
"sdb is initialized, version:%d totalRows:%d numOfTables:%d"
,
tsSdbObj
->
version
,
totalRows
,
numOfTables
);
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
1
;
syncInfo
.
version
=
sdbGetVersion
();
syncInfo
.
syncCfg
=
syncCfg
;
sprintf
(
syncInfo
.
path
,
"%s/"
,
tsMnodeDir
);
syncInfo
.
ahandle
=
NULL
;
syncInfo
.
getWalInfo
=
sdbGetWalInfo
;
syncInfo
.
getFileInfo
=
sdbGetFileInfo
;
syncInfo
.
writeToCache
=
sdbWrite
;
syncInfo
.
confirmForward
=
sdbConfirmForward
;
syncInfo
.
notifyRole
=
sdbNotifyRole
;
tsSdbObj
.
cfg
=
syncCfg
;
if
(
tsSdbObj
.
sync
)
{
syncReconfig
(
tsSdbObj
.
sync
,
&
syncCfg
);
}
else
{
tsSdbObj
.
sync
=
syncStart
(
&
syncInfo
);
}
}
int32_t
sdbInit
()
{
pthread_mutex_init
(
&
tsSdbObj
.
mutex
,
NULL
);
sem_init
(
&
tsSdbObj
.
sem
,
0
,
0
);
if
(
sdbInitWal
()
!=
0
)
{
return
-
1
;
}
replicaNotify
();
sdbRestoreTables
();
if
(
mgmtGetMnodesNum
()
==
1
)
{
tsSdbObj
.
role
=
TAOS_SYNC_ROLE_MASTER
;
}
sdbUpdateSync
();
tsSdbObj
.
status
=
SDB_STATUS_SERVING
;
return
TSDB_CODE_SUCCESS
;
}
void
sdbCleanUp
()
{
if
(
tsSdbObj
)
{
pthread_mutex_destroy
(
&
tsSdbObj
->
mutex
);
walClose
(
tsSdbObj
->
wal
);
free
(
tsSdbObj
);
tsSdbObj
=
NULL
;
}
if
(
tsSdbObj
.
status
!=
SDB_STATUS_SERVING
)
return
;
syncStop
(
tsSdbObj
.
sync
);
free
(
tsSdbObj
.
sync
);
walClose
(
tsSdbObj
.
wal
);
sem_destroy
(
&
tsSdbObj
.
sem
);
pthread_mutex_destroy
(
&
tsSdbObj
.
mutex
);
memset
(
&
tsSdbObj
,
0
,
sizeof
(
tsSdbObj
));
}
void
sdbIncRef
(
void
*
handle
,
void
*
pRow
)
{
...
...
@@ -178,15 +353,15 @@ void sdbDecRef(void *handle, void *pRow) {
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"table:%s, record:%s:%s:%d is destroyed"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
SSdbOper
Desc
oper
=
{.
pObj
=
pRow
};
SSdbOper
oper
=
{.
pObj
=
pRow
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
}
}
static
S
RowMeta
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
static
S
SdbRow
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
S
RowMeta
*
pMeta
;
S
SdbRow
*
pMeta
;
if
(
handle
==
NULL
)
return
NULL
;
...
...
@@ -197,7 +372,7 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) {
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
S
RowMeta
*
pMeta
;
S
SdbRow
*
pMeta
;
if
(
handle
==
NULL
)
return
NULL
;
...
...
@@ -213,8 +388,8 @@ void *sdbGetRow(void *handle, void *key) {
return
pMeta
->
row
;
}
static
int32_t
sdbInsert
Local
(
SSdbTable
*
pTable
,
SSdbOperDesc
*
pOper
)
{
S
RowMeta
rowMeta
;
static
int32_t
sdbInsert
Hash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
S
SdbRow
rowMeta
;
rowMeta
.
rowSize
=
pOper
->
rowSize
;
rowMeta
.
row
=
pOper
->
pObj
;
...
...
@@ -229,20 +404,20 @@ static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, insert record:%s, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
sdbTrace
(
"table:%s, insert record:%s
to hash
, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
);
(
*
pTable
->
insertFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sdbDelete
Local
(
SSdbTable
*
pTable
,
SSdbOperDesc
*
pOper
)
{
static
int32_t
sdbDelete
Hash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
pthread_mutex_lock
(
&
pTable
->
mutex
);
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pOper
->
pObj
);
pTable
->
numOfRows
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, delete record:%s, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
sdbTrace
(
"table:%s, delete record:%s
from hash
, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
);
(
*
pTable
->
deleteFp
)(
pOper
);
...
...
@@ -253,127 +428,76 @@ static int32_t sdbDeleteLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sdbUpdate
Local
(
SSdbTable
*
pTable
,
SSdbOperDesc
*
pOper
)
{
sdbTrace
(
"table:%s, update record:%s, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
static
int32_t
sdbUpdate
Hash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
sdbTrace
(
"table:%s, update record:%s
in hash
, numOfRows:%d"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
);
(
*
pTable
->
updateFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sdbProcessWriteFromApp
(
SSdbTable
*
pTable
,
SWalHead
*
pHead
,
int32_t
action
)
{
int32_t
code
=
0
;
static
int
sdbWrite
(
void
*
param
,
void
*
data
,
int
type
)
{
SWalHead
*
pHead
=
data
;
int32_t
tableId
=
pHead
->
msgType
/
10
;
int32_t
action
=
pHead
->
msgType
%
10
;
pthread_mutex_lock
(
&
tsSdbObj
->
mutex
);
tsSdbObj
->
version
++
;
pHead
->
version
=
tsSdbObj
->
version
;
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
assert
(
pTable
!=
NULL
);
code
=
replicaForwardReqToPeer
(
pHead
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
sdbError
(
"table:%s, failed to forward %s record:%s from file, version:%"
PRId64
", reason:%s"
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tstrerror
(
code
));
return
code
;
pthread_mutex_lock
(
&
tsSdbObj
.
mutex
);
if
(
pHead
->
version
==
0
)
{
// assign version
tsSdbObj
.
version
++
;
pHead
->
version
=
tsSdbObj
.
version
;
}
else
{
// for data from WAL or forward, version may be smaller
if
(
pHead
->
version
<=
tsSdbObj
.
version
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdbObj
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
sdbError
(
"table:%s, failed to restore %s record:%s from wal, version:%"
PRId64
" too large, sdb version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tsSdbObj
.
version
);
return
TSDB_CODE_OTHERS
;
}
else
{
tsSdbObj
.
version
=
pHead
->
version
;
}
}
code
=
walWrite
(
tsSdbObj
->
wal
,
pHead
);
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
int32_t
code
=
walWrite
(
tsSdbObj
.
wal
,
pHead
);
if
(
code
<
0
)
{
sdbError
(
"table:%s, failed to %s record:%s to file, version:%"
PRId64
", reason:%s"
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tstrerror
(
code
));
}
else
{
sdbTrace
(
"table:%s, success to %s record:%s to file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
return
code
;
}
walFsync
(
tsSdbObj
.
wal
);
walFsync
(
tsSdbObj
->
wal
);
taosFreeQitem
(
pHead
);
return
code
;
}
static
int32_t
sdbProcessWriteFromWal
(
SSdbTable
*
pTable
,
SWalHead
*
pHead
,
int32_t
action
)
{
pthread_mutex_lock
(
&
tsSdbObj
->
mutex
);
if
(
pHead
->
version
<=
tsSdbObj
->
version
)
{
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdbObj
->
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
sdbError
(
"table:%s, failed to restore %s record:%s from file, version:%"
PRId64
" too large, sdb version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tsSdbObj
->
version
);
return
TSDB_CODE_OTHERS
;
}
sdbForwardToPeer
(
pHead
);
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
tsSdbObj
->
version
=
pHead
->
version
;
sdbTrace
(
"table:%s, success to restore %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
// from app, oper is created
if
(
param
==
NULL
)
return
code
;
int32_t
code
=
-
1
;
// from wal, should create oper
if
(
action
==
SDB_ACTION_INSERT
)
{
SSdbOperDesc
oper
=
{
.
rowSize
=
pHead
->
len
,
.
rowData
=
pHead
->
cont
,
.
table
=
pTable
,
};
SSdbOper
oper
=
{.
rowSize
=
pHead
->
len
,
.
rowData
=
pHead
->
cont
,
.
table
=
pTable
};
code
=
(
*
pTable
->
decodeFp
)(
&
oper
);
if
(
code
<
0
)
{
sdbTrace
(
"table:%s, failed to decode %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
return
code
;
}
code
=
sdbInsertLocal
(
pTable
,
&
oper
);
return
sdbInsertHash
(
pTable
,
&
oper
);
}
else
if
(
action
==
SDB_ACTION_DELETE
)
{
S
RowMeta
*
rowMeta
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
S
SdbRow
*
rowMeta
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
assert
(
rowMeta
!=
NULL
&&
rowMeta
->
row
!=
NULL
);
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
pObj
=
rowMeta
->
row
,
};
code
=
sdbDeleteLocal
(
pTable
,
&
oper
);
SSdbOper
oper
=
{.
table
=
pTable
,
.
pObj
=
rowMeta
->
row
};
return
sdbDeleteHash
(
pTable
,
&
oper
);
}
else
if
(
action
==
SDB_ACTION_UPDATE
)
{
S
RowMeta
*
rowMeta
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
S
SdbRow
*
rowMeta
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
assert
(
rowMeta
!=
NULL
&&
rowMeta
->
row
!=
NULL
);
SSdbOperDesc
oper
=
{
.
rowSize
=
pHead
->
len
,
.
rowData
=
pHead
->
cont
,
.
table
=
pTable
,
};
SSdbOper
oper
=
{.
rowSize
=
pHead
->
len
,
.
rowData
=
pHead
->
cont
,
.
table
=
pTable
};
code
=
(
*
pTable
->
decodeFp
)(
&
oper
);
if
(
code
<
0
)
{
sdbTrace
(
"table:%s, failed to decode %s record:%s from file, version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetkeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
return
code
;
}
code
=
sdbUpdateLocal
(
pTable
,
&
oper
);
}
pthread_mutex_unlock
(
&
tsSdbObj
->
mutex
);
return
code
;
return
sdbUpdateHash
(
pTable
,
&
oper
);
}
else
{
return
TSDB_CODE_INVALID_MSG_TYPE
;
}
}
int
sdbProcessWrite
(
void
*
param
,
void
*
data
,
int
type
)
{
SWalHead
*
pHead
=
data
;
int32_t
tableId
=
pHead
->
msgType
/
10
;
int32_t
action
=
pHead
->
msgType
%
10
;
SSdbTable
*
pTable
=
sdbGetTableFromId
(
tableId
);
assert
(
pTable
!=
NULL
);
if
(
pHead
->
version
==
0
)
{
return
sdbProcessWriteFromApp
(
pTable
,
pHead
,
action
);
}
else
{
return
sdbProcessWriteFromWal
(
pTable
,
pHead
,
action
);
}
}
int32_t
sdbInsertRow
(
SSdbOperDesc
*
pOper
)
{
int32_t
sdbInsertRow
(
SSdbOper
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
...
...
@@ -405,19 +529,19 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
(
*
pTable
->
encodeFp
)(
pOper
);
pHead
->
len
=
pOper
->
rowSize
;
int32_t
code
=
sdbProcessWrite
(
tsSdbObj
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbWrite
(
NULL
,
pHead
,
pHead
->
msgType
);
taosFreeQitem
(
pHead
);
if
(
code
<
0
)
return
code
;
}
return
sdbInsert
Local
(
pTable
,
pOper
);
}
return
sdbInsert
Hash
(
pTable
,
pOper
);
}
// row here can be object or null-terminated string
int32_t
sdbDeleteRow
(
SSdbOperDesc
*
pOper
)
{
int32_t
sdbDeleteRow
(
SSdbOper
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
S
RowMeta
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
S
SdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
@@ -447,18 +571,19 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_DELETE
;
memcpy
(
pHead
->
cont
,
pOper
->
pObj
,
rowSize
);
int32_t
code
=
sdbProcessWrite
(
tsSdbObj
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbWrite
(
NULL
,
pHead
,
pHead
->
msgType
);
taosFreeQitem
(
pHead
);
if
(
code
<
0
)
return
code
;
}
return
sdbDelete
Local
(
pTable
,
pOper
);
}
return
sdbDelete
Hash
(
pTable
,
pOper
);
}
int32_t
sdbUpdateRow
(
SSdbOper
Desc
*
pOper
)
{
int32_t
sdbUpdateRow
(
SSdbOper
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
S
RowMeta
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
S
SdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
@@ -477,16 +602,17 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
(
*
pTable
->
encodeFp
)(
pOper
);
pHead
->
len
=
pOper
->
rowSize
;
int32_t
code
=
sdbProcessWrite
(
tsSdbObj
,
pHead
,
pHead
->
msgType
);
int32_t
code
=
sdbWrite
(
NULL
,
pHead
,
pHead
->
msgType
);
taosFreeQitem
(
pHead
);
if
(
code
<
0
)
return
code
;
}
}
return
sdbUpdate
Local
(
pTable
,
pOper
);
return
sdbUpdate
Hash
(
pTable
,
pOper
);
}
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
S
RowMeta
*
pMeta
;
S
SdbRow
*
pMeta
;
*
ppRow
=
NULL
;
if
(
pTable
==
NULL
)
return
NULL
;
...
...
@@ -520,13 +646,13 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable
->
restoredFp
=
pDesc
->
restoredFp
;
if
(
sdbInitIndexFp
[
pTable
->
keyType
]
!=
NULL
)
{
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
S
RowMeta
));
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
S
SdbRow
));
}
pthread_mutex_init
(
&
pTable
->
mutex
,
NULL
);
tsSdb
N
umOfTables
++
;
tsSdb
T
ableList
[
pTable
->
tableId
]
=
pTable
;
tsSdb
Obj
.
n
umOfTables
++
;
tsSdb
Obj
.
t
ableList
[
pTable
->
tableId
]
=
pTable
;
return
pTable
;
}
...
...
@@ -534,16 +660,16 @@ void sdbCloseTable(void *handle) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
if
(
pTable
==
NULL
)
return
;
tsSdb
N
umOfTables
--
;
tsSdb
T
ableList
[
pTable
->
tableId
]
=
NULL
;
tsSdb
Obj
.
n
umOfTables
--
;
tsSdb
Obj
.
t
ableList
[
pTable
->
tableId
]
=
NULL
;
void
*
pNode
=
NULL
;
while
(
1
)
{
S
RowMeta
*
pMeta
;
S
SdbRow
*
pMeta
;
pNode
=
(
*
sdbFetchRowFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pNode
,
(
void
**
)
&
pMeta
);
if
(
pMeta
==
NULL
)
break
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
pObj
=
pMeta
->
row
,
.
table
=
pTable
,
};
...
...
@@ -557,6 +683,7 @@ void sdbCloseTable(void *handle) {
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdb
N
umOfTables
);
sdbTrace
(
"table:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdb
Obj
.
n
umOfTables
);
free
(
pTable
);
}
src/mnode/src/mgmtShell.c
浏览文件 @
6a47a011
...
...
@@ -144,7 +144,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return
;
}
if
(
!
mgmt
IsMaster
())
{
if
(
!
sdb
IsMaster
())
{
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NO_MASTER
);
rpcFreeCont
(
rpcMsg
->
pCont
);
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
6a47a011
...
...
@@ -83,12 +83,12 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) {
tfree
(
pTable
);
}
static
int32_t
mgmtChildTableActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionDestroy
(
SSdbOper
*
pOper
)
{
mgmtDestroyChildTable
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionInsert
(
SSdbOper
*
pOper
)
{
SChildTableObj
*
pTable
=
pOper
->
pObj
;
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
...
...
@@ -128,7 +128,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionDelete
(
SSdbOper
*
pOper
)
{
SChildTableObj
*
pTable
=
pOper
->
pObj
;
if
(
pTable
->
vgId
==
0
)
{
return
TSDB_CODE_INVALID_VGROUP_ID
;
...
...
@@ -169,7 +169,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionUpdate
(
SSdbOper
*
pOper
)
{
SChildTableObj
*
pNew
=
pOper
->
pObj
;
SChildTableObj
*
pTable
=
mgmtGetChildTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
...
...
@@ -186,7 +186,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
;
SChildTableObj
*
pTable
=
pOper
->
pObj
;
assert
(
pTable
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
...
...
@@ -208,7 +208,7 @@ static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtChildTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SChildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChildTableObj
));
if
(
pTable
==
NULL
)
{
...
...
@@ -252,7 +252,7 @@ static int32_t mgmtChildTableActionRestored() {
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pTable
->
info
.
tableId
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, failed to get db, discard it"
,
pTable
->
info
.
tableId
);
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -266,7 +266,7 @@ static int32_t mgmtChildTableActionRestored() {
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, failed to get vgroup:%d sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
sid
);
pTable
->
vgId
=
0
;
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -280,7 +280,7 @@ static int32_t mgmtChildTableActionRestored() {
mError
(
"ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pDb
->
name
,
pTable
->
vgId
,
pVgroup
->
dbName
,
pTable
->
sid
);
pTable
->
vgId
=
0
;
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -292,7 +292,7 @@ static int32_t mgmtChildTableActionRestored() {
if
(
pVgroup
->
tableList
==
NULL
)
{
mError
(
"ctable:%s, vgroup:%d tableList is null"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
pTable
->
vgId
=
0
;
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -306,7 +306,7 @@ static int32_t mgmtChildTableActionRestored() {
if
(
pSuperTable
==
NULL
)
{
mError
(
"ctable:%s, stable:%s not exist"
,
pTable
->
info
.
tableId
,
pTable
->
superTableId
);
pTable
->
vgId
=
0
;
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -347,7 +347,7 @@ static int32_t mgmtInitChildTables() {
return
-
1
;
}
mTrace
(
"
child table is initializ
ed"
);
mTrace
(
"
table:ctables is creat
ed"
);
return
0
;
}
...
...
@@ -392,12 +392,12 @@ static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
tfree
(
pStable
);
}
static
int32_t
mgmtSuperTableActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionDestroy
(
SSdbOper
*
pOper
)
{
mgmtDestroySuperTable
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionInsert
(
SSdbOper
*
pOper
)
{
SSuperTableObj
*
pStable
=
pOper
->
pObj
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
)
{
...
...
@@ -408,7 +408,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionDelete
(
SSdbOper
*
pOper
)
{
SSuperTableObj
*
pStable
=
pOper
->
pObj
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
)
{
...
...
@@ -420,7 +420,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionUpdate
(
SSdbOper
*
pOper
)
{
SSuperTableObj
*
pNew
=
pOper
->
pObj
;
SSuperTableObj
*
pTable
=
mgmtGetSuperTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
...
...
@@ -435,7 +435,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
;
SSuperTableObj
*
pStable
=
pOper
->
pObj
;
...
...
@@ -454,7 +454,7 @@ static int32_t mgmtSuperTableActionEncode(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtSuperTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuperTableObj
));
...
...
@@ -505,7 +505,7 @@ static int32_t mgmtInitSuperTables() {
return
-
1
;
}
mTrace
(
"
stables is initializ
ed"
);
mTrace
(
"
table:stables is creat
ed"
);
return
0
;
}
...
...
@@ -731,7 +731,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
tschema
[
col
].
bytes
=
htons
(
tschema
[
col
].
bytes
);
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -755,7 +755,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
mError
(
"stable:%s, numOfTables:%d not 0"
,
pStable
->
info
.
tableId
,
pStable
->
numOfTables
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_OTHERS
);
}
else
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
...
...
@@ -806,7 +806,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
pStable
->
numOfColumns
+=
ntags
;
pStable
->
sversion
++
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -837,7 +837,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfTags
+
pStable
->
numOfColumns
);
pStable
->
schema
=
realloc
(
pStable
->
schema
,
schemaSize
);
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -872,7 +872,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag
SSchema
*
schema
=
(
SSchema
*
)
(
pStable
->
schema
+
(
pStable
->
numOfColumns
+
col
)
*
sizeof
(
SSchema
));
strncpy
(
schema
->
name
,
newTagName
,
TSDB_COL_NAME_LEN
);
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -931,7 +931,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
mgmtDecAcctRef
(
pAcct
);
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -968,7 +968,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
mgmtDecAcctRef
(
pAcct
);
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
...
...
@@ -1116,7 +1116,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
}
if
(
strncmp
(
pDropDb
->
name
,
pTable
->
info
.
tableId
,
dbNameLen
)
==
0
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pTable
,
...
...
@@ -1354,7 +1354,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
}
}
SSdbOper
Desc
desc
=
{
0
};
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_GLOBAL
;
desc
.
pObj
=
pTable
;
desc
.
table
=
tsChildTableSdb
;
...
...
@@ -1508,7 +1508,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
mgmtDecAcctRef
(
pAcct
);
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
...
...
@@ -1542,7 +1542,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
mgmtDecAcctRef
(
pAcct
);
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
...
...
@@ -1687,7 +1687,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
}
if
(
strncmp
(
pDropDb
->
name
,
pTable
->
info
.
tableId
,
dbNameLen
)
==
0
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
...
...
@@ -1716,7 +1716,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
}
if
(
pTable
->
superTable
==
pStable
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
...
...
@@ -1805,7 +1805,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
return
;
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
...
...
@@ -1848,7 +1848,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mError
(
"table:%s, failed to create in dnode, thandle:%p result:%s"
,
pTable
->
info
.
tableId
,
queueMsg
->
thandle
,
tstrerror
(
rpcMsg
->
code
));
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
6a47a011
...
...
@@ -36,12 +36,12 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static
void
mgmtProcessAlterUserMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessDropUserMsg
(
SQueuedMsg
*
pMsg
);
static
int32_t
mgmtUserActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionDestroy
(
SSdbOper
*
pOper
)
{
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionInsert
(
SSdbOper
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
...
...
@@ -56,7 +56,7 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionDelete
(
SSdbOper
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
...
...
@@ -67,7 +67,7 @@ static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionUpdate
(
SSdbOper
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
SUserObj
*
pSaved
=
mgmtGetUser
(
pUser
->
user
);
if
(
pUser
!=
pSaved
)
{
...
...
@@ -77,14 +77,14 @@ static int32_t mgmtUserActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionEncode
(
SSdbOper
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pUser
,
tsUserUpdateSize
);
pOper
->
rowSize
=
tsUserUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtUserActionDecode
(
SSdbOper
*
pOper
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
calloc
(
1
,
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -137,7 +137,7 @@ int32_t mgmtInitUsers() {
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_USER
,
mgmtGetUserMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_USER
,
mgmtRetrieveUsers
);
mTrace
(
"
user data is initializ
ed"
);
mTrace
(
"
table:users table is creat
ed"
);
return
0
;
}
...
...
@@ -154,7 +154,7 @@ void mgmtReleaseUser(SUserObj *pUser) {
}
static
int32_t
mgmtUpdateUser
(
SUserObj
*
pUser
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsUserSdb
,
.
pObj
=
pUser
,
...
...
@@ -202,7 +202,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
pUser
->
superAuth
=
1
;
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsUserSdb
,
.
pObj
=
pUser
,
...
...
@@ -219,7 +219,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
}
static
int32_t
mgmtDropUser
(
SUserObj
*
pUser
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsUserSdb
,
.
pObj
=
pUser
...
...
@@ -493,7 +493,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
if
(
pUser
==
NULL
)
break
;
if
(
strncmp
(
pUser
->
acct
,
pAcct
->
user
,
acctNameLen
)
==
0
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsUserSdb
,
.
pObj
=
pUser
,
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
6a47a011
...
...
@@ -22,7 +22,7 @@
#include "tidpool.h"
#include "tsync.h"
#include "ttime.h"
#include "t
replica
.h"
#include "t
balance
.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
...
...
@@ -48,7 +48,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ;
static
void
mgmtSendDropVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
static
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
static
int32_t
mgmtVgroupActionDestroy
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionDestroy
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
if
(
pVgroup
->
idPool
)
{
taosIdPoolCleanUp
(
pVgroup
->
idPool
);
...
...
@@ -62,7 +62,7 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionInsert
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionInsert
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
...
...
@@ -104,7 +104,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionDelete
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionDelete
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
if
(
pVgroup
->
pDb
!=
NULL
)
{
...
...
@@ -124,7 +124,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionUpdate
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionUpdate
(
SSdbOper
*
pOper
)
{
SVgObj
*
pNew
=
pOper
->
pObj
;
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pNew
->
vgId
);
if
(
pVgroup
!=
pNew
)
{
...
...
@@ -147,14 +147,14 @@ static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionEncode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionEncode
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pVgroup
,
tsVgUpdateSize
);
pOper
->
rowSize
=
tsVgUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionDecode
(
SSdbOper
Desc
*
pOper
)
{
static
int32_t
mgmtVgroupActionDecode
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
calloc
(
1
,
sizeof
(
SVgObj
));
if
(
pVgroup
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -199,7 +199,7 @@ int32_t mgmtInitVgroups() {
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_DROP_VNODE_RSP
,
mgmtProcessDropVnodeRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_CONFIG_VNODE
,
mgmtProcessVnodeCfgMsg
);
mTrace
(
"
vgroup is initializ
ed"
);
mTrace
(
"
table:vgroups is creat
ed"
);
return
0
;
}
...
...
@@ -213,7 +213,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
}
void
mgmtUpdateVgroup
(
SVgObj
*
pVgroup
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
...
...
@@ -249,7 +249,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
strcpy
(
pVgroup
->
dbName
,
pDb
->
name
);
pVgroup
->
numOfVnodes
=
pDb
->
cfg
.
replications
;
pVgroup
->
createdTime
=
taosGetTimestampMs
();
if
(
replica
AllocVnodes
(
pVgroup
)
!=
0
)
{
if
(
balance
AllocVnodes
(
pVgroup
)
!=
0
)
{
mError
(
"db:%s, no enough dnode to alloc %d vnodes to vgroup"
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
free
(
pVgroup
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_ENOUGH_DNODES
);
...
...
@@ -257,7 +257,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
return
;
}
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
...
...
@@ -289,7 +289,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
}
else
{
mTrace
(
"vgroup:%d, replica:%d is deleting from sdb"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
mgmtSendDropVgroupMsg
(
pVgroup
,
NULL
);
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
...
...
@@ -596,7 +596,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SQueuedMsg
*
newMsg
=
mgmtCloneQueuedMsg
(
queueMsg
);
mgmtAddToShellQueue
(
newMsg
);
}
else
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
...
...
@@ -659,7 +659,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if
(
queueMsg
->
received
!=
queueMsg
->
expected
)
return
;
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
...
...
@@ -716,7 +716,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
if
(
pVgroup
==
NULL
)
break
;
if
(
strncmp
(
pDropDb
->
name
,
pVgroup
->
dbName
,
dbNameLen
)
==
0
)
{
SSdbOper
Desc
oper
=
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
6a47a011
...
...
@@ -45,6 +45,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
#ifndef _SYNC
tsync_h
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
int
syncForwardToPeer
(
tsync_h
shandle
,
void
*
pHead
,
void
*
mhandle
)
{
return
0
;
}
void
syncStop
(
tsync_h
shandle
)
{}
int
syncReconfig
(
tsync_h
shandle
,
const
SSyncCfg
*
cfg
)
{
return
0
;
}
int
syncGetNodesRole
(
tsync_h
shandle
,
SNodesRole
*
cfg
)
{
return
0
;
}
#endif
static
void
vnodeInit
()
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录