Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ef3e39c9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ef3e39c9
编写于
10月 17, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sdb version
上级
f76dbd58
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
468 addition
and
50 deletion
+468
-50
include/os/osDef.h
include/os/osDef.h
+6
-0
include/server/mnode/mnode.h
include/server/mnode/mnode.h
+6
-15
source/server/dnode/src/dnodeInt.c
source/server/dnode/src/dnodeInt.c
+0
-1
source/server/mnode/CMakeLists.txt
source/server/mnode/CMakeLists.txt
+1
-0
source/server/mnode/inc/mnodeDef.h
source/server/mnode/inc/mnodeDef.h
+15
-7
source/server/mnode/inc/mnodeInt.h
source/server/mnode/inc/mnodeInt.h
+4
-4
source/server/mnode/inc/mnodeSdb.h
source/server/mnode/inc/mnodeSdb.h
+20
-6
source/server/mnode/src/mnodeAcct.c
source/server/mnode/src/mnodeAcct.c
+58
-3
source/server/mnode/src/mnodeSdb.c
source/server/mnode/src/mnodeSdb.c
+352
-8
source/server/mnode/src/mondeInt.c
source/server/mnode/src/mondeInt.c
+6
-6
未找到文件。
include/os/osDef.h
浏览文件 @
ef3e39c9
...
...
@@ -178,6 +178,12 @@ extern "C" {
#define setThreadName(name)
#endif
#if defined(_WIN32)
#define TD_DIRSEP "\\"
#else
#define TD_DIRSEP "/"
#endif
#ifdef __cplusplus
}
#endif
...
...
include/server/mnode/mnode.h
浏览文件 @
ef3e39c9
...
...
@@ -20,6 +20,8 @@
extern
"C"
{
#endif
typedef
enum
{
MN_STATUS_UNINIT
=
0
,
MN_STATUS_INIT
=
1
,
MN_STATUS_READY
=
2
,
MN_STATUS_CLOSING
=
3
}
EMnStatus
;
typedef
struct
{
/**
* Send messages to other dnodes, such as create vnode message.
...
...
@@ -44,17 +46,6 @@ typedef struct {
*/
void
(
*
SendRedirectMsg
)(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnode, the instance of dDnode module.
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void
(
*
GetDnodeEp
)(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
}
SMnodeFp
;
typedef
struct
{
...
...
@@ -93,7 +84,7 @@ void mnodeUnDeploy();
*
* @return Server status.
*/
bool
mnodeIsServing
();
EMnStatus
mnodeIsServing
();
typedef
struct
{
int64_t
numOfDnode
;
...
...
source/server/dnode/src/dnodeInt.c
浏览文件 @
ef3e39c9
...
...
@@ -46,7 +46,6 @@ static int32_t dnodeInitVnodeModule(void **unused) {
static
int32_t
dnodeInitMnodeModule
(
void
**
unused
)
{
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGetDnodeEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
...
...
source/server/mnode/CMakeLists.txt
浏览文件 @
ef3e39c9
...
...
@@ -8,4 +8,5 @@ target_include_directories(
target_link_libraries
(
mnode
PUBLIC transport
PUBLIC cjson
)
\ No newline at end of file
source/server/mnode/inc/mnodeDef.h
浏览文件 @
ef3e39c9
...
...
@@ -20,6 +20,7 @@
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "cJSON.h"
#include "mnode.h"
#ifdef __cplusplus
...
...
@@ -94,15 +95,15 @@ typedef enum {
MN_AUTH_MAX
}
EMnAuthOp
;
typedef
enum
{
MN_STATUS_UNINIT
=
0
,
MN_STATUS_INIT
=
1
,
MN_STATUS_READY
=
2
,
MN_STATUS_CLOSING
=
3
}
EMnStatus
;
typedef
enum
{
MN_SDB_STAT_AVAIL
=
0
,
MN_SDB_STAT_DROPPED
=
1
}
EMnSdbStat
;
typedef
struct
{
int8_t
type
;
int8_t
status
;
int8_t
align
[
6
];
}
SdbHead
;
typedef
struct
SClusterObj
{
SdbHead
head
;
int64_t
id
;
char
uid
[
TSDB_CLUSTER_ID_LEN
];
int64_t
createdTime
;
...
...
@@ -110,6 +111,7 @@ typedef struct SClusterObj {
}
SClusterObj
;
typedef
struct
SDnodeObj
{
SdbHead
head
;
int32_t
id
;
int32_t
vnodes
;
int64_t
createdTime
;
...
...
@@ -126,6 +128,7 @@ typedef struct SDnodeObj {
}
SDnodeObj
;
typedef
struct
SMnodeObj
{
SdbHead
head
;
int32_t
id
;
int8_t
status
;
int8_t
role
;
...
...
@@ -169,6 +172,7 @@ typedef struct {
}
SAcctInfo
;
typedef
struct
SAcctObj
{
SdbHead
head
;
char
acct
[
TSDB_USER_LEN
];
int64_t
createdTime
;
int64_t
updateTime
;
...
...
@@ -179,6 +183,7 @@ typedef struct SAcctObj {
}
SAcctObj
;
typedef
struct
SUserObj
{
SdbHead
head
;
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_KEY_LEN
];
char
acct
[
TSDB_USER_LEN
];
...
...
@@ -212,6 +217,7 @@ typedef struct {
}
SDbCfg
;
typedef
struct
SDbObj
{
SdbHead
head
;
char
name
[
TSDB_ACCT_ID_LEN
+
TSDB_DB_NAME_LEN
];
char
acct
[
TSDB_USER_LEN
];
int64_t
createdTime
;
...
...
@@ -255,6 +261,7 @@ typedef struct SVgObj {
}
SVgObj
;
typedef
struct
SSTableObj
{
SdbHead
head
;
char
tableId
[
TSDB_TABLE_NAME_LEN
];
uint64_t
uid
;
int64_t
createdTime
;
...
...
@@ -265,6 +272,7 @@ typedef struct SSTableObj {
}
SSTableObj
;
typedef
struct
SFuncObj
{
SdbHead
head
;
char
name
[
TSDB_FUNC_NAME_LEN
];
char
path
[
128
];
int32_t
contLen
;
...
...
source/server/mnode/inc/mnodeInt.h
浏览文件 @
ef3e39c9
...
...
@@ -25,7 +25,7 @@ extern "C" {
tmr_h
mnodeGetTimer
();
int32_t
mnodeGetDnodeId
();
char
*
mnodeGetClusterId
();
bool
mnodeIsRunn
ing
();
EMnStatus
mnodeIsServ
ing
();
void
mnodeSendMsgToDnode
(
struct
SRpcEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
void
mnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
);
...
...
source/server/mnode/inc/mnodeSdb.h
浏览文件 @
ef3e39c9
...
...
@@ -22,14 +22,28 @@
extern
"C"
{
#endif
int32_t
mnodeInitSdb
();
void
mnodeCleanupSdb
();
typedef
void
(
*
SdbDeployFp
)();
typedef
void
*
(
*
SdbDecodeFp
)(
cJSON
*
root
);
typedef
int32_t
(
*
SdbEncodeFp
)(
void
*
pHead
,
char
*
buf
,
int32_t
maxLen
);
int32_t
mnodeDeploySdb
();
void
mnodeUnDeploySdb
();
int32_t
sdbInit
();
void
sdbCleanup
();
int32_t
mnodeReadSdb
();
int32_t
mnodeCommitSdb
();
int32_t
sdbRead
();
int32_t
sdbCommit
();
int32_t
sdbDeploy
();
void
sdbUnDeploy
();
void
*
sdbInsertRow
(
EMnSdb
sdb
,
void
*
pObj
);
void
sdbDeleteRow
(
EMnSdb
sdb
,
void
*
pHead
);
void
*
sdbUpdateRow
(
EMnSdb
sdb
,
void
*
pHead
);
void
*
sdbGetRow
(
EMnSdb
sdb
,
void
*
pKey
);
void
*
sdbFetchRow
(
EMnSdb
sdb
,
void
*
pIter
);
void
sdbCancelFetch
(
EMnSdb
sdb
,
void
*
pIter
);
int32_t
sdbGetCount
(
EMnSdb
sdb
);
void
sdbSetFp
(
EMnSdb
,
EMnKey
,
SdbDeployFp
,
SdbEncodeFp
,
SdbDecodeFp
,
int32_t
dataSize
);
#ifdef __cplusplus
}
...
...
source/server/mnode/src/mnodeAcct.c
浏览文件 @
ef3e39c9
...
...
@@ -15,7 +15,62 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
#include "mnodeSdb.h"
static
void
mnodeCreateDefaultAcct
()
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SAcctObj
acctObj
=
{
0
};
tstrncpy
(
acctObj
.
acct
,
TSDB_DEFAULT_USER
,
TSDB_USER_LEN
);
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
128
,
.
maxDbs
=
128
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxConnections
=
1024
,
.
maxStreams
=
1000
,
.
maxPointsPerSecond
=
10000000
,
.
maxStorage
=
INT64_MAX
,
.
maxQueryTime
=
INT64_MAX
,
.
maxInbound
=
0
,
.
maxOutbound
=
0
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
acctObj
.
acctId
=
1
;
acctObj
.
createdTime
=
taosGetTimestampMs
();
sdbInsertRow
(
MN_SDB_ACCT
,
&
acctObj
);
}
int32_t
mnodeEncodeAcct
(
SAcctObj
*
pAcct
,
char
*
buf
,
int32_t
maxLen
)
{
int32_t
len
=
0
;
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"{
\"
type
\"
:%d, "
,
MN_SDB_ACCT
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
acctId
\"
:
\"
%d
\"
, "
,
pAcct
->
acctId
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxUsers
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxUsers
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxDbs
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxDbs
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxTimeSeries
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxTimeSeries
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxConnections
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxConnections
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxStreams
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxStreams
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxPointsPerSecond
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxPointsPerSecond
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxUsers
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
cfg
.
maxStorage
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxQueryTime
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
cfg
.
maxQueryTime
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxInbound
\"\"
:%"
PRIu64
"
\"
, "
,
pAcct
->
cfg
.
maxInbound
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxOutbound
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
cfg
.
maxOutbound
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
accessState
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
accessState
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
createdTime
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
createdTime
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
updateTime
\"
:
\"
%"
PRIu64
"
\"
}
\n
"
,
pAcct
->
updateTime
);
return
len
;
}
SAcctObj
*
mnodeDecodeAcct
(
cJSON
*
root
)
{
SAcctObj
*
pAcct
=
calloc
(
1
,
sizeof
(
SAcctObj
));
return
pAcct
;
}
int32_t
mnodeInitAcct
()
{
sdbSetFp
(
MN_SDB_ACCT
,
MN_KEY_BINARY
,
mnodeCreateDefaultAcct
,
(
SdbEncodeFp
)
mnodeEncodeAcct
,
(
SdbDecodeFp
)(
mnodeDecodeAcct
),
sizeof
(
SAcctObj
));
return
0
;
}
int32_t
mnodeInitAcct
()
{
return
0
;
}
void
mnodeCleanupAcct
()
{}
source/server/mnode/src/mnodeSdb.c
浏览文件 @
ef3e39c9
...
...
@@ -15,12 +15,220 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
#include "thash.h"
#include "tglobal.h"
#include "cJSON.h"
#include "mnodeSdb.h"
int32_t
mnodeInitSdb
()
{
return
0
;
}
void
mnodeCleanupSdb
()
{}
static
struct
{
char
currDir
[
PATH_MAX
];
char
backDir
[
PATH_MAX
];
char
tmpDir
[
PATH_MAX
];
int64_t
version
;
EMnKey
hashKey
[
MN_SDB_MAX
];
int32_t
dataSize
[
MN_SDB_MAX
];
SHashObj
*
hashObj
[
MN_SDB_MAX
];
SdbDeployFp
deployFp
[
MN_SDB_MAX
];
SdbEncodeFp
encodeFp
[
MN_SDB_MAX
];
SdbDecodeFp
decodeFp
[
MN_SDB_MAX
];
}
tsSdb
=
{
0
};
int32_t
mnodeDeploySdb
()
{
static
int32_t
sdbCreateDir
()
{
if
(
!
taosMkDir
(
tsSdb
.
currDir
))
{
mError
(
"failed to create dir:%s"
,
tsSdb
.
currDir
);
return
TAOS_SYSTEM_ERROR
(
errno
);
}
if
(
!
taosMkDir
(
tsSdb
.
backDir
))
{
mError
(
"failed to create dir:%s"
,
tsSdb
.
backDir
);
return
-
1
;
}
if
(
!
taosMkDir
(
tsSdb
.
tmpDir
))
{
mError
(
"failed to create dir:%s"
,
tsSdb
.
tmpDir
);
return
-
1
;
}
return
0
;
}
static
int32_t
sdbRunDeployFp
()
{
for
(
int32_t
i
=
MN_SDB_START
;
i
<
MN_SDB_MAX
;
++
i
)
{
SdbDeployFp
fp
=
tsSdb
.
deployFp
[
i
];
if
(
fp
)
{
(
*
fp
)();
}
}
return
0
;
}
static
int32_t
sdbReadVersion
(
cJSON
*
root
)
{
cJSON
*
ver
=
cJSON_GetObjectItem
(
root
,
"version"
);
if
(
!
ver
||
ver
->
type
!=
cJSON_String
)
{
mError
(
"failed to parse version since version not found"
);
return
-
1
;
}
tsSdb
.
version
=
(
int64_t
)
atoll
(
ver
->
valuestring
);
mTrace
(
"parse version success, version:%"
PRIu64
,
tsSdb
.
version
);
return
0
;
}
static
void
sdbWriteVersion
(
FileFd
fd
)
{
char
content
[
128
];
int32_t
len
=
snprintf
(
content
,
sizeof
(
content
),
"{
\"
type
\"
:0,
\"
version
\"
:
\"
%"
PRIu64
"
\"
,
\"
updateTime
\"
:
\"
%"
PRIu64
"
\"
}
\n
"
,
tsSdb
.
version
,
taosGetTimestampMs
());
taosWriteFile
(
fd
,
content
,
len
);
}
static
int32_t
sdbReadDataFile
()
{
ssize_t
_bytes
=
0
;
size_t
len
=
4096
;
char
*
line
=
calloc
(
1
,
len
);
int32_t
code
=
-
1
;
FILE
*
fp
=
NULL
;
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
fp
=
fopen
(
file
,
"r"
);
if
(
!
fp
)
{
mError
(
"failed to open file:%s for read since %s"
,
file
,
strerror
(
errno
));
goto
PARSE_SDB_DATA_ERROR
;
}
while
(
!
feof
(
fp
))
{
memset
(
line
,
0
,
len
);
_bytes
=
tgetline
(
&
line
,
&
len
,
fp
);
if
(
_bytes
<
0
)
{
break
;
}
line
[
len
-
1
]
=
0
;
if
(
len
<=
10
)
continue
;
root
=
cJSON_Parse
(
line
);
if
(
root
==
NULL
)
{
mError
(
"failed to parse since invalid json format, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
cJSON
*
type
=
cJSON_GetObjectItem
(
root
,
"type"
);
if
(
!
type
||
type
->
type
!=
cJSON_Number
)
{
mError
(
"failed to parse since invalid type not found, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
if
(
type
->
valueint
>=
MN_SDB_MAX
||
type
->
valueint
<
MN_SDB_START
)
{
mError
(
"failed to parse since invalid type, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
if
(
type
->
valueint
==
MN_SDB_START
)
{
if
(
sdbReadVersion
(
root
)
!=
0
)
{
mError
(
"failed to parse version, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
cJSON_Delete
(
root
);
root
=
NULL
;
continue
;
}
SdbDecodeFp
func
=
tsSdb
.
decodeFp
[
type
->
valueint
];
SdbHead
*
pHead
=
(
*
func
)(
root
);
if
(
pHead
==
NULL
)
{
mError
(
"failed to parse since decode error, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
sdbInsertRow
(
pHead
->
type
,
pHead
);
cJSON_Delete
(
root
);
root
=
NULL
;
}
code
=
0
;
PARSE_SDB_DATA_ERROR:
tfree
(
line
);
fclose
(
fp
);
cJSON_Delete
(
root
);
return
code
;
}
static
int32_t
sdbWriteDataFile
()
{
char
file
[
PATH_MAX
+
20
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
file
);
if
(
fd
<=
0
)
{
mError
(
"failed to open file:%s for write since %s"
,
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
;
int32_t
maxLen
=
10240
;
char
*
buf
=
malloc
(
maxLen
);
for
(
int32_t
i
=
MN_SDB_START
;
i
<
MN_SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObj
[
i
];
if
(
!
hash
)
continue
;
SdbEncodeFp
fp
=
tsSdb
.
encodeFp
[
i
];
if
(
!
fp
)
continue
;
SdbHead
*
pHead
=
taosHashIterate
(
hash
,
NULL
);
while
(
pHead
!=
NULL
)
{
len
=
(
*
fp
)(
pHead
,
buf
,
maxLen
);
if
(
len
>=
0
)
{
taosWriteFile
(
fd
,
buf
,
len
);
}
pHead
=
taosHashIterate
(
hash
,
pHead
);
}
}
sdbWriteVersion
(
fd
);
taosFsyncFile
(
fd
);
taosCloseFile
(
fd
);
mInfo
(
"write file:%s successfully"
,
file
);
return
0
;
}
int32_t
sdbCommit
()
{
int32_t
code
=
sdbWriteDataFile
();
if
(
code
!=
0
)
{
return
code
;
}
return
0
;
}
int32_t
sdbRead
()
{
int32_t
code
=
sdbReadDataFile
();
if
(
code
!=
0
)
{
return
code
;
}
mInfo
(
"read sdb file successfully"
);
return
-
1
;
}
int32_t
sdbDeploy
()
{
if
(
sdbCreateDir
()
!=
0
)
{
return
-
1
;
}
if
(
sdbRunDeployFp
()
!=
0
)
{
return
-
1
;
}
if
(
sdbCommit
()
!=
0
)
{
return
-
1
;
}
// if (!taosMkDir())
// if (pMinfos == NULL) { // first deploy
...
...
@@ -42,6 +250,142 @@ int32_t mnodeDeploySdb() {
return
0
;
}
void
mnodeUnDeploySdb
()
{}
int32_t
mnodeReadSdb
()
{
return
0
;
}
int32_t
mnodeCommitSdb
()
{
return
0
;
}
\ No newline at end of file
void
sdbUnDeploy
()
{}
int32_t
sdbInit
()
{
snprintf
(
tsSdb
.
currDir
,
PATH_MAX
,
"%s%scurrent%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
snprintf
(
tsSdb
.
backDir
,
PATH_MAX
,
"%s%sbackup%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
snprintf
(
tsSdb
.
tmpDir
,
PATH_MAX
,
"%s%stmp%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
for
(
int32_t
i
=
0
;
i
<
MN_SDB_MAX
;
++
i
)
{
int32_t
type
;
if
(
tsSdb
.
hashKey
[
i
]
==
MN_KEY_INT32
)
{
type
=
TSDB_DATA_TYPE_INT
;
}
else
if
(
tsSdb
.
hashKey
[
i
]
==
MN_KEY_INT64
)
{
type
=
TSDB_DATA_TYPE_BIGINT
;
}
else
{
type
=
TSDB_DATA_TYPE_BINARY
;
}
SHashObj
*
hash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
type
),
true
,
HASH_NO_LOCK
);
if
(
hash
==
NULL
)
{
return
-
1
;
}
tsSdb
.
hashObj
[
i
]
=
hash
;
}
return
0
;
}
void
sdbCleanup
()
{
for
(
int32_t
i
=
0
;
i
<
MN_SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObj
[
i
];
if
(
hash
!=
NULL
)
{
taosHashCleanup
(
hash
);
}
tsSdb
.
hashObj
[
i
]
=
NULL
;
}
}
void
sdbSetFp
(
EMnSdb
sdb
,
EMnKey
keyType
,
SdbDeployFp
deployFp
,
SdbEncodeFp
encodeFp
,
SdbDecodeFp
decodeFp
,
int32_t
dataSize
)
{
tsSdb
.
deployFp
[
sdb
]
=
deployFp
;
tsSdb
.
encodeFp
[
sdb
]
=
encodeFp
;
tsSdb
.
decodeFp
[
sdb
]
=
decodeFp
;
tsSdb
.
dataSize
[
sdb
]
=
dataSize
;
tsSdb
.
hashKey
[
sdb
]
=
keyType
;
}
static
SHashObj
*
sdbGetHash
(
int32_t
sdb
)
{
if
(
sdb
>=
MN_SDB_MAX
||
sdb
<=
MN_SDB_START
)
{
return
NULL
;
}
SHashObj
*
hash
=
tsSdb
.
hashObj
[
sdb
];
if
(
hash
==
NULL
)
{
return
NULL
;
}
return
hash
;
}
void
*
sdbInsertRow
(
EMnSdb
sdb
,
void
*
p
)
{
SdbHead
*
pHead
=
p
;
pHead
->
type
=
sdb
;
pHead
->
status
=
MN_SDB_STAT_AVAIL
;
char
*
pKey
=
(
char
*
)
pHead
+
sizeof
(
pHead
);
int32_t
keySize
;
EMnKey
keyType
=
tsSdb
.
hashKey
[
pHead
->
type
];
int32_t
dataSize
=
tsSdb
.
dataSize
[
pHead
->
type
];
SHashObj
*
hash
=
sdbGetHash
(
pHead
->
type
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
if
(
keyType
==
MN_KEY_INT32
)
{
keySize
=
sizeof
(
int32_t
);
}
else
if
(
keyType
==
MN_KEY_BINARY
)
{
keySize
=
strlen
(
pKey
)
+
1
;
}
else
{
keySize
=
sizeof
(
int64_t
);
}
taosHashPut
(
hash
,
pKey
,
keySize
,
pHead
,
dataSize
);
return
taosHashGet
(
hash
,
pKey
,
keySize
);
}
void
sdbDeleteRow
(
EMnSdb
sdb
,
void
*
p
)
{
SdbHead
*
pHead
=
p
;
pHead
->
status
=
MN_SDB_STAT_DROPPED
;
}
void
*
sdbUpdateRow
(
EMnSdb
sdb
,
void
*
pHead
)
{
return
sdbInsertRow
(
sdb
,
pHead
);
}
void
*
sdbGetRow
(
EMnSdb
sdb
,
void
*
pKey
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
int32_t
keySize
;
EMnKey
keyType
=
tsSdb
.
hashKey
[
sdb
];
if
(
keyType
==
MN_KEY_INT32
)
{
keySize
=
sizeof
(
int32_t
);
}
else
if
(
keyType
==
MN_KEY_BINARY
)
{
keySize
=
strlen
(
pKey
)
+
1
;
}
else
{
keySize
=
sizeof
(
int64_t
);
}
return
taosHashGet
(
hash
,
pKey
,
keySize
);
}
void
*
sdbFetchRow
(
EMnSdb
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
return
taosHashIterate
(
hash
,
pIter
);
}
void
sdbCancelFetch
(
EMnSdb
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
;
}
taosHashCancelIterate
(
hash
,
pIter
);
}
int32_t
sdbGetCount
(
EMnSdb
sdb
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
0
;
}
return
taosHashGetSize
(
hash
);
}
\ No newline at end of file
source/server/mnode/src/mondeInt.c
浏览文件 @
ef3e39c9
...
...
@@ -51,7 +51,7 @@ int32_t mnodeGetDnodeId() { return tsMint.dnodeId; }
char
*
mnodeGetClusterId
()
{
return
tsMint
.
clusterId
;
}
bool
mnodeIsServing
()
{
return
tsMint
.
state
==
MN_STATUS_READY
;
}
EMnStatus
mnodeIsServing
()
{
return
tsMint
.
state
;
}
void
mnodeSendMsgToDnode
(
struct
SRpcEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
)
{
(
*
tsMint
.
fp
.
SendMsgToDnode
)(
epSet
,
rpcMsg
);
...
...
@@ -68,7 +68,6 @@ static int32_t mnodeSetPara(SMnodePara para) {
tsMint
.
dnodeId
=
para
.
dnodeId
;
strncpy
(
tsMint
.
clusterId
,
para
.
clusterId
,
TSDB_CLUSTER_ID_LEN
);
if
(
tsMint
.
fp
.
GetDnodeEp
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
SendMsgToDnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
SendMsgToMnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
SendRedirectMsg
==
NULL
)
return
-
1
;
...
...
@@ -96,7 +95,7 @@ static int32_t mnodeInitStep1() {
struct
SSteps
*
steps
=
taosStepInit
(
16
,
NULL
);
if
(
steps
==
NULL
)
return
-
1
;
taosStepAdd
(
steps
,
"mnode-sdb"
,
mnodeInitSdb
,
mnodeCleanupSdb
);
taosStepAdd
(
steps
,
"mnode-sdb"
,
sdbInit
,
sdbCleanup
);
taosStepAdd
(
steps
,
"mnode-cluster"
,
mnodeInitCluster
,
mnodeCleanupCluster
);
taosStepAdd
(
steps
,
"mnode-dnode"
,
mnodeInitDnode
,
mnodeCleanupDnode
);
taosStepAdd
(
steps
,
"mnode-mnode"
,
mnodeInitMnode
,
mnodeCleanupMnode
);
...
...
@@ -177,11 +176,12 @@ int32_t mnodeDeploy() {
}
void
mnodeUnDeploy
()
{
mnodeUnDeploySdb
();
sdbUnDeploy
();
mnodeCleanup
();
}
int32_t
mnodeInit
(
SMnodePara
para
)
{
mDebugFlag
=
207
;
if
(
tsMint
.
state
!=
MN_STATUS_UNINIT
)
{
return
0
;
}
else
{
...
...
@@ -202,10 +202,10 @@ int32_t mnodeInit(SMnodePara para) {
return
-
1
;
}
code
=
mnodeReadSdb
();
code
=
sdbRead
();
if
(
code
!=
0
)
{
if
(
mnodeNeedDeploy
())
{
code
=
mnodeDeploySdb
();
code
=
sdbDeploy
();
if
(
code
!=
0
)
{
mnodeCleanupStep1
();
tsMint
.
state
=
MN_STATUS_UNINIT
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录