Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
4b7e463b
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4b7e463b
编写于
3月 19, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-15] refactor sdb
上级
75bd0eba
变更
23
显示空白变更内容
内联
并排
Showing
23 changed file
with
300 addition
and
628 deletion
+300
-628
src/inc/mnode.h
src/inc/mnode.h
+0
-1
src/inc/sdb.h
src/inc/sdb.h
+0
-131
src/inc/taosdef.h
src/inc/taosdef.h
+10
-0
src/kit/CMakeLists.txt
src/kit/CMakeLists.txt
+2
-2
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+5
-2
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+43
-108
src/mnode/src/mgmtChildTable.c
src/mnode/src/mgmtChildTable.c
+5
-11
src/mnode/src/mgmtDClient.c
src/mnode/src/mgmtDClient.c
+1
-1
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+1
-1
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+3
-9
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+1
-1
src/mnode/src/mgmtMain.c
src/mnode/src/mgmtMain.c
+7
-5
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+55
-15
src/mnode/src/mgmtNormalTable.c
src/mnode/src/mgmtNormalTable.c
+4
-11
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+122
-249
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+12
-30
src/mnode/src/mgmtSuperTable.c
src/mnode/src/mgmtSuperTable.c
+3
-9
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+1
-0
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+2
-8
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+2
-2
src/util/inc/tglobalcfg.h
src/util/inc/tglobalcfg.h
+0
-6
src/util/inc/tlog.h
src/util/inc/tlog.h
+19
-0
src/util/src/tglobalcfg.c
src/util/src/tglobalcfg.c
+2
-26
未找到文件。
src/inc/mnode.h
浏览文件 @
4b7e463b
...
@@ -25,7 +25,6 @@ extern "C" {
...
@@ -25,7 +25,6 @@ extern "C" {
#include "taosdef.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "taoserror.h"
#include "sdb.h"
#include "tglobalcfg.h"
#include "tglobalcfg.h"
#include "thash.h"
#include "thash.h"
#include "tidpool.h"
#include "tidpool.h"
...
...
src/inc/sdb.h
已删除
100644 → 0
浏览文件 @
75bd0eba
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SDB_H
#define TDENGINE_SDB_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosmsg.h"
#include "taosdef.h"
extern
uint16_t
tsMgmtMgmtPort
;
extern
uint16_t
tsMgmtSyncPort
;
extern
int
tsMgmtPeerHBTimer
;
// seconds
extern
char
*
sdbStatusStr
[];
extern
char
*
sdbRoleStr
[];
extern
int
sdbMaster
;
extern
SRpcIpSet
*
pSdbIpList
;
extern
SRpcIpSet
*
pSdbPublicIpList
;
extern
void
(
*
sdbWorkAsMasterCallback
)();
// this function pointer will be set by taosd
enum
_keytype
{
SDB_KEYTYPE_STRING
,
SDB_KEYTYPE_UINT32
,
SDB_KEYTYPE_AUTO
,
SDB_KEYTYPE_RECYCLE
,
SDB_KEYTYPE_MAX
};
#define SDB_ROLE_UNAPPROVED 0
#define SDB_ROLE_UNDECIDED 1
#define SDB_ROLE_MASTER 2
#define SDB_ROLE_SLAVE 3
#define SDB_STATUS_OFFLINE 0
#define SDB_STATUS_UNSYNCED 1
#define SDB_STATUS_SYNCING 2
#define SDB_STATUS_SERVING 3
#define SDB_STATUS_DELETED 4
enum
_sdbaction
{
SDB_TYPE_INSERT
,
SDB_TYPE_DELETE
,
SDB_TYPE_UPDATE
,
SDB_TYPE_DECODE
,
SDB_TYPE_ENCODE
,
SDB_TYPE_BEFORE_BATCH_UPDATE
,
SDB_TYPE_BATCH_UPDATE
,
SDB_TYPE_AFTER_BATCH_UPDATE
,
SDB_TYPE_RESET
,
SDB_TYPE_DESTROY
,
SDB_MAX_ACTION_TYPES
};
#define SDB_MAX_PEERS 4
typedef
struct
{
uint32_t
ip
;
uint32_t
publicIp
;
char
ipstr
[
20
];
char
zone
[
12
];
char
role
;
int64_t
createdTime
;
uint64_t
dbVersion
;
int64_t
lostTime
;
char
status
;
char
numOfMnodes
;
int
numOfDnodes
;
char
updateEnd
[
1
];
// internal
int
syncFd
;
void
*
hbTimer
;
void
*
pSync
;
}
SSdbPeer
;
extern
SSdbPeer
*
sdbPeer
[];
#define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status)
void
*
sdbOpenTable
(
int
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
,
int
*
));
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
int64_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
int
rowSize
);
int
sdbDeleteRow
(
void
*
handle
,
void
*
key
);
int
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int
updateSize
,
char
isUpdated
);
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
int
sdbBatchUpdateRow
(
void
*
handle
,
void
*
row
,
int
rowSize
);
int64_t
sdbGetId
(
void
*
handle
);
int64_t
sdbGetNumOfRows
(
void
*
handle
);
void
sdbSaveSnapShot
(
void
*
handle
);
void
sdbCloseTable
(
void
*
handle
);
int
sdbRemovePeerByIp
(
uint32_t
ip
);
int
sdbInitPeers
(
char
*
directory
);
void
sdbCleanUpPeers
();
int64_t
sdbGetVersion
();
int32_t
sdbGetRunStatus
();
#define TSDB_MAX_NORMAL_TABLES 10000
#define TSDB_MAX_SUPER_TABLES 1000
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SDB_H
src/inc/taosdef.h
浏览文件 @
4b7e463b
...
@@ -309,6 +309,16 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
...
@@ -309,6 +309,16 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
#define TSDB_MAX_MNODES 5
#define TSDB_MAX_DNODES 10
#define TSDB_MAX_ACCOUNTS 10
#define TSDB_MAX_USERS 20
#define TSDB_MAX_DBS 100
#define TSDB_MAX_VGROUPS 1000
#define TSDB_MAX_SUPER_TABLES 100
#define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000
enum
{
enum
{
TSDB_PRECISION_MILLI
,
TSDB_PRECISION_MILLI
,
TSDB_PRECISION_MICRO
,
TSDB_PRECISION_MICRO
,
...
...
src/kit/CMakeLists.txt
浏览文件 @
4b7e463b
...
@@ -2,5 +2,5 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
...
@@ -2,5 +2,5 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT
(
TDengine
)
PROJECT
(
TDengine
)
ADD_SUBDIRECTORY
(
shell
)
ADD_SUBDIRECTORY
(
shell
)
ADD_SUBDIRECTORY
(
taosdemo
)
#
ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY
(
taosdump
)
#
ADD_SUBDIRECTORY(taosdump)
src/mnode/inc/mgmtMnode.h
浏览文件 @
4b7e463b
...
@@ -20,9 +20,12 @@
...
@@ -20,9 +20,12 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
bool
mgmtCheckRedirect
(
void
*
handle
);
int32_t
mgmtInitMnodes
();
void
mgmtCleanupMnodes
();
void
mgmtGetMnodeIpList
(
SRpcIpSet
*
ipSet
);
bool
mgmtCheckRedirect
(
void
*
handle
);
void
mgmtGetMnodePrivateIpList
(
SRpcIpSet
*
ipSet
);
void
mgmtGetMnodePublicIpList
(
SRpcIpSet
*
ipSet
);
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
);
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
);
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
);
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
);
...
...
src/mnode/inc/mgmtSdb.h
浏览文件 @
4b7e463b
...
@@ -13,8 +13,12 @@
...
@@ -13,8 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef _sdbint_header_
#ifndef TDENGINE_MNODE_SDB_H
#define _sdbint_header_
#define TDENGINE_MNODE_SDB_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <errno.h>
#include <errno.h>
#include <pthread.h>
#include <pthread.h>
...
@@ -27,115 +31,46 @@
...
@@ -27,115 +31,46 @@
#include "hashint.h"
#include "hashint.h"
#include "hashstr.h"
#include "hashstr.h"
#include "sdb.h"
#include "tchecksum.h"
#include "tchecksum.h"
#include "tlog.h"
#include "tlog.h"
#include "trpc.h"
#include "trpc.h"
#include "tutil.h"
#include "tutil.h"
#define sdbError(...) \
enum
_keytype
{
if (sdbDebugFlag & DEBUG_ERROR) { \
SDB_KEYTYPE_STRING
,
tprintf("ERROR MND-SDB ", 255, __VA_ARGS__); \
SDB_KEYTYPE_AUTO
,
}
SDB_KEYTYPE_MAX
#define sdbWarn(...) \
};
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
enum
_sdbaction
{
}
SDB_TYPE_INSERT
,
#define sdbTrace(...) \
SDB_TYPE_DELETE
,
if (sdbDebugFlag & DEBUG_TRACE) { \
SDB_TYPE_UPDATE
,
tprintf("MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
SDB_TYPE_DECODE
,
}
SDB_TYPE_ENCODE
,
#define sdbPrint(...) \
SDB_TYPE_DESTROY
,
{ tprintf("MND-SDB ", 255, __VA_ARGS__); }
SDB_MAX_ACTION_TYPES
};
#define mpeerError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
uint64_t
sdbGetVersion
();
tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \
bool
sdbInServerState
();
}
bool
sdbIsMaster
();
#define mpeerWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
void
*
sdbOpenTable
(
int32_t
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int32_t
,
int32_t
*
));
}
void
sdbCloseTable
(
void
*
handle
);
#define mpeerTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
}
int64_t
sdbGetId
(
void
*
handle
);
#define mpeerPrint(...) \
int64_t
sdbGetNumOfRows
(
void
*
handle
);
{ tprintf("MND-MPEER ", 255, __VA_ARGS__); }
int64_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
int32_t
rowSize
);
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
key
);
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
updateSize
,
char
isUpdated
);
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
#ifdef __cplusplus
#define SDB_MAX_PEERS 4
}
#define SDB_DELIMITER 0xFFF00F00
#endif
#define SDB_ENDCOMMIT 0xAFFFAAAF
typedef
struct
{
uint64_t
swVersion
;
int16_t
sdbFileVersion
;
char
reserved
[
6
];
TSCKSUM
checkSum
;
}
SSdbHeader
;
typedef
struct
{
char
type
;
// short rowSize;
char
*
row
;
}
SSdbUpdate
;
typedef
struct
_SSdbTable
{
SSdbHeader
header
;
int
maxRows
;
int
dbId
;
int32_t
maxRowSize
;
char
name
[
TSDB_DB_NAME_LEN
];
char
fn
[
128
];
int
keyType
;
uint32_t
autoIndex
;
int64_t
numOfRows
;
int64_t
id
;
int64_t
size
;
void
*
iHandle
;
int
fd
;
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
,
int
*
);
pthread_mutex_t
mutex
;
SSdbUpdate
*
update
;
int
numOfUpdates
;
int
updatePos
;
}
SSdbTable
;
typedef
struct
{
int64_t
id
;
int64_t
offset
;
int
rowSize
;
void
*
row
;
}
SRowMeta
;
typedef
struct
{
int32_t
delimiter
;
int32_t
rowSize
;
int64_t
id
;
char
data
[];
}
SRowHead
;
typedef
struct
{
uint8_t
dbId
;
char
type
;
uint64_t
version
;
short
dataLen
;
char
data
[];
}
SForwardMsg
;
extern
SSdbTable
*
tableList
[];
extern
int
sdbMaxPeers
;
extern
int
sdbNumOfTables
;
extern
int64_t
sdbVersion
;
int
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int
dataLen
);
int
mpeerRetrieveRows
(
int
fd
,
SSdbTable
*
pTable
,
uint64_t
version
);
void
sdbResetTable
(
SSdbTable
*
pTable
);
extern
const
int16_t
sdbFileVersion
;
#endif
#endif
\ No newline at end of file
src/mnode/src/mgmtChildTable.c
浏览文件 @
4b7e463b
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "taosmsg.h"
#include "taosmsg.h"
#include "taosdef.h"
#include "tschemautil.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tskiplist.h"
...
@@ -26,10 +27,11 @@
...
@@ -26,10 +27,11 @@
#include "mgmtAcct.h"
#include "mgmtAcct.h"
#include "mgmtChildTable.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtGrant.h"
#include "mgmtProfile.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtDClient.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
#include "mgmtVgroup.h"
...
@@ -56,7 +58,6 @@ static void mgmtChildTableActionInit() {
...
@@ -56,7 +58,6 @@ static void mgmtChildTableActionInit() {
mgmtChildTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtChildTableActionUpdate
;
mgmtChildTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtChildTableActionUpdate
;
mgmtChildTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtChildTableActionEncode
;
mgmtChildTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtChildTableActionEncode
;
mgmtChildTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtChildTableActionDecode
;
mgmtChildTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtChildTableActionDecode
;
mgmtChildTableActionFp
[
SDB_TYPE_RESET
]
=
mgmtChildTableActionReset
;
mgmtChildTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtChildTableActionDestroy
;
mgmtChildTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtChildTableActionDestroy
;
}
}
...
@@ -93,7 +94,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
...
@@ -93,7 +94,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
return
NULL
;
return
NULL
;
}
}
if
(
!
sdb
Master
)
{
if
(
!
sdb
IsMaster
()
)
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
!=
pTable
->
sid
)
{
if
(
sid
!=
pTable
->
sid
)
{
mError
(
"ctable:%s, sid:%d is not matched from the master:%d"
,
pTable
->
tableId
,
sid
,
pTable
->
sid
);
mError
(
"ctable:%s, sid:%d is not matched from the master:%d"
,
pTable
->
tableId
,
sid
,
pTable
->
sid
);
...
@@ -311,13 +312,6 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab
...
@@ -311,13 +312,6 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab
}
}
void
*
mgmtCreateChildTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
tid
)
{
void
*
mgmtCreateChildTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
tid
)
{
int32_t
numOfTables
=
sdbGetNumOfRows
(
tsChildTableSdb
);
if
(
numOfTables
>=
tsMaxTables
)
{
mError
(
"ctable:%s, numOfTables:%d exceed maxTables:%d"
,
pCreate
->
tableId
,
numOfTables
,
tsMaxTables
);
terrno
=
TSDB_CODE_TOO_MANY_TABLES
;
return
NULL
;
}
char
*
pTagData
=
(
char
*
)
pCreate
->
schema
;
// it is a tag key
char
*
pTagData
=
(
char
*
)
pCreate
->
schema
;
// it is a tag key
SSuperTableObj
*
pSuperTable
=
mgmtGetSuperTable
(
pTagData
);
SSuperTableObj
*
pSuperTable
=
mgmtGetSuperTable
(
pTagData
);
if
(
pSuperTable
==
NULL
)
{
if
(
pSuperTable
==
NULL
)
{
...
@@ -338,7 +332,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
...
@@ -338,7 +332,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
pTable
->
type
=
TSDB_CHILD_TABLE
;
pTable
->
type
=
TSDB_CHILD_TABLE
;
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
40
)
+
((((
uint64_t
)
pTable
->
sid
)
&
((
1ul
<<
24
)
-
1ul
))
<<
16
)
+
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
40
)
+
((((
uint64_t
)
pTable
->
sid
)
&
((
1ul
<<
24
)
-
1ul
))
<<
16
)
+
(
(
uint64_t
)
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
(
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pTable
->
sid
=
tid
;
pTable
->
sid
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
superTable
=
pSuperTable
;
pTable
->
superTable
=
pSuperTable
;
...
...
src/mnode/src/mgmtDClient.c
浏览文件 @
4b7e463b
...
@@ -42,7 +42,7 @@ int32_t mgmtInitDClient() {
...
@@ -42,7 +42,7 @@ int32_t mgmtInitDClient() {
rpcInit
.
label
=
"MND-DC"
;
rpcInit
.
label
=
"MND-DC"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
mgmtProcessRspFromDnode
;
rpcInit
.
cfp
=
mgmtProcessRspFromDnode
;
rpcInit
.
sessions
=
tsMaxDnodes
*
5
;
rpcInit
.
sessions
=
100
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"mgmtDClient"
;
rpcInit
.
user
=
"mgmtDClient"
;
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
4b7e463b
...
@@ -45,7 +45,7 @@ int32_t mgmtInitDServer() {
...
@@ -45,7 +45,7 @@ int32_t mgmtInitDServer() {
rpcInit
.
label
=
"MND-DS"
;
rpcInit
.
label
=
"MND-DS"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
mgmtProcessMsgFromDnode
;
rpcInit
.
cfp
=
mgmtProcessMsgFromDnode
;
rpcInit
.
sessions
=
tsMaxDnodes
*
5
;
rpcInit
.
sessions
=
100
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
afp
=
mgmtDServerRetrieveAuth
;
rpcInit
.
afp
=
mgmtDServerRetrieveAuth
;
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
4b7e463b
...
@@ -24,11 +24,12 @@
...
@@ -24,11 +24,12 @@
#include "mgmtBalance.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtGrant.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtNormalTable.h"
#include "mgmtChildTable.h"
#include "mgmtChildTable.h"
#include "mgmtSdb.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
...
@@ -62,7 +63,6 @@ static void mgmtDbActionInit() {
...
@@ -62,7 +63,6 @@ static void mgmtDbActionInit() {
mgmtDbActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtDbActionUpdate
;
mgmtDbActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtDbActionUpdate
;
mgmtDbActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtDbActionEncode
;
mgmtDbActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtDbActionEncode
;
mgmtDbActionFp
[
SDB_TYPE_DECODE
]
=
mgmtDbActionDecode
;
mgmtDbActionFp
[
SDB_TYPE_DECODE
]
=
mgmtDbActionDecode
;
mgmtDbActionFp
[
SDB_TYPE_RESET
]
=
mgmtDbActionReset
;
mgmtDbActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtDbActionDestroy
;
mgmtDbActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtDbActionDestroy
;
}
}
...
@@ -83,7 +83,7 @@ int32_t mgmtInitDbs() {
...
@@ -83,7 +83,7 @@ int32_t mgmtInitDbs() {
SDbObj
tObj
;
SDbObj
tObj
;
tsDbUpdateSize
=
tObj
.
updateEnd
-
(
char
*
)
&
tObj
;
tsDbUpdateSize
=
tObj
.
updateEnd
-
(
char
*
)
&
tObj
;
tsDbSdb
=
sdbOpenTable
(
tsMaxDbs
,
tsDbUpdateSize
,
"dbs"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtDbAction
);
tsDbSdb
=
sdbOpenTable
(
TSDB_MAX_DBS
,
tsDbUpdateSize
,
"dbs"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtDbAction
);
if
(
tsDbSdb
==
NULL
)
{
if
(
tsDbSdb
==
NULL
)
{
mError
(
"failed to init db data"
);
mError
(
"failed to init db data"
);
return
-
1
;
return
-
1
;
...
@@ -252,12 +252,6 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
...
@@ -252,12 +252,6 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
}
}
static
int32_t
mgmtCreateDb
(
SAcctObj
*
pAcct
,
SCMCreateDbMsg
*
pCreate
)
{
static
int32_t
mgmtCreateDb
(
SAcctObj
*
pAcct
,
SCMCreateDbMsg
*
pCreate
)
{
int32_t
numOfDbs
=
sdbGetNumOfRows
(
tsDbSdb
);
if
(
numOfDbs
>=
tsMaxDbs
)
{
mWarn
(
"numOfDbs:%d, exceed tsMaxDbs:%d"
,
numOfDbs
,
tsMaxDbs
);
return
TSDB_CODE_TOO_MANY_DATABASES
;
}
int32_t
code
=
mgmtCheckDbLimit
(
pAcct
);
int32_t
code
=
mgmtCheckDbLimit
(
pAcct
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
code
;
return
code
;
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
4b7e463b
...
@@ -597,7 +597,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
...
@@ -597,7 +597,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return
;
return
;
}
}
mgmtGetMnodeIpList
(
&
pRsp
->
ipList
);
mgmtGetMnode
Private
IpList
(
&
pRsp
->
ipList
);
pRsp
->
dnodeState
.
dnodeId
=
htonl
(
pDnode
->
dnodeId
);
pRsp
->
dnodeState
.
dnodeId
=
htonl
(
pDnode
->
dnodeId
);
pRsp
->
dnodeState
.
moduleStatus
=
htonl
(
pDnode
->
moduleStatus
);
pRsp
->
dnodeState
.
moduleStatus
=
htonl
(
pDnode
->
moduleStatus
);
...
...
src/mnode/src/mgmtMain.c
浏览文件 @
4b7e463b
...
@@ -25,6 +25,8 @@
...
@@ -25,6 +25,8 @@
#include "mgmtDClient.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtDServer.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
...
@@ -65,7 +67,7 @@ int32_t mgmtStartSystem() {
...
@@ -65,7 +67,7 @@ int32_t mgmtStartSystem() {
return
0
;
return
0
;
}
}
tsMgmtTmr
=
taosTmrInit
((
tsMax
Dnodes
+
tsMax
ShellConns
)
*
3
,
200
,
3600000
,
"MND"
);
tsMgmtTmr
=
taosTmrInit
((
tsMaxShellConns
)
*
3
,
200
,
3600000
,
"MND"
);
if
(
tsMgmtTmr
==
NULL
)
{
if
(
tsMgmtTmr
==
NULL
)
{
mError
(
"failed to init timer"
);
mError
(
"failed to init timer"
);
return
-
1
;
return
-
1
;
...
@@ -109,8 +111,8 @@ int32_t mgmtStartSystem() {
...
@@ -109,8 +111,8 @@ int32_t mgmtStartSystem() {
return
-
1
;
return
-
1
;
}
}
if
(
sdbInitPeers
(
tsMnodeDir
)
<
0
)
{
if
(
mgmtInitMnodes
(
)
<
0
)
{
mError
(
"failed to init
peer
s"
);
mError
(
"failed to init
mnode
s"
);
return
-
1
;
return
-
1
;
}
}
...
@@ -125,7 +127,7 @@ int32_t mgmtStartSystem() {
...
@@ -125,7 +127,7 @@ int32_t mgmtStartSystem() {
void
mgmtStopSystem
()
{
void
mgmtStopSystem
()
{
if
(
sdb
Master
)
{
if
(
sdb
IsMaster
()
)
{
mTrace
(
"it is a master mgmt node, it could not be stopped"
);
mTrace
(
"it is a master mgmt node, it could not be stopped"
);
return
;
return
;
}
}
...
@@ -136,7 +138,7 @@ void mgmtStopSystem() {
...
@@ -136,7 +138,7 @@ void mgmtStopSystem() {
void
mgmtCleanUpSystem
()
{
void
mgmtCleanUpSystem
()
{
mPrint
(
"starting to clean up mgmt"
);
mPrint
(
"starting to clean up mgmt"
);
sdbCleanUpPeer
s
();
mgmtCleanupMnode
s
();
mgmtCleanupBalance
();
mgmtCleanupBalance
();
mgmtCleanUpShell
();
mgmtCleanUpShell
();
mgmtCleanupDClient
();
mgmtCleanupDClient
();
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
4b7e463b
...
@@ -18,48 +18,81 @@
...
@@ -18,48 +18,81 @@
#include "trpc.h"
#include "trpc.h"
#include "tschemautil.h"
#include "tschemautil.h"
#include "mgmtMnode.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
int32_t
(
*
mgmtAddMnodeFp
)(
uint32_t
privateIp
,
uint32_t
publicIp
)
=
NULL
;
int32_t
(
*
mpeerAddMnodeFp
)(
uint32_t
privateIp
,
uint32_t
publicIp
)
=
NULL
;
int32_t
(
*
mgmtRemoveMnodeFp
)(
uint32_t
privateIp
)
=
NULL
;
int32_t
(
*
mpeerRemoveMnodeFp
)(
uint32_t
privateIp
)
=
NULL
;
int32_t
(
*
mgmtGetMnodesNumFp
)()
=
NULL
;
int32_t
(
*
mpeerGetMnodesNumFp
)()
=
NULL
;
void
*
(
*
mgmtGetNextMnodeFp
)(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
=
NULL
;
void
*
(
*
mpeerGetNextMnodeFp
)(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
=
NULL
;
int32_t
(
*
mpeerInitMnodesFp
)()
=
NULL
;
void
(
*
mpeerCleanUpMnodesFp
)()
=
NULL
;
static
SMnodeObj
tsMnodeObj
=
{
0
};
static
SMnodeObj
tsMnodeObj
=
{
0
};
static
int32_t
mgmtGetMnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtGetMnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
char
*
mgmtMnodeStatusStr
[]
=
{
"offline"
,
"unsynced"
,
"syncing"
,
"serving"
,
"null"
};
static
char
*
mgmtMnodeRoleStr
[]
=
{
"unauthed"
,
"undecided"
,
"master"
,
"slave"
,
"null"
};
int32_t
mgmtInitMnodes
()
{
if
(
mpeerInitMnodesFp
)
{
return
(
*
mpeerInitMnodesFp
)();
}
else
{
return
0
;
}
}
void
mgmtCleanupMnodes
()
{
if
(
mpeerCleanUpMnodesFp
)
{
(
*
mpeerCleanUpMnodesFp
)();
}
}
bool
mgmtCheckRedirect
(
void
*
handle
)
{
bool
mgmtCheckRedirect
(
void
*
handle
)
{
return
false
;
return
false
;
}
}
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
)
{
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
)
{
if
(
m
gmt
AddMnodeFp
)
{
if
(
m
peer
AddMnodeFp
)
{
return
(
*
m
gmt
AddMnodeFp
)(
privateIp
,
publicIp
);
return
(
*
m
peer
AddMnodeFp
)(
privateIp
,
publicIp
);
}
else
{
}
else
{
return
0
;
return
0
;
}
}
}
}
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
)
{
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
)
{
if
(
m
gmt
RemoveMnodeFp
)
{
if
(
m
peer
RemoveMnodeFp
)
{
return
(
*
m
gmt
RemoveMnodeFp
)(
privateIp
);
return
(
*
m
peer
RemoveMnodeFp
)(
privateIp
);
}
else
{
}
else
{
return
0
;
return
0
;
}
}
}
}
static
int32_t
mgmtGetMnodesNum
()
{
static
int32_t
mgmtGetMnodesNum
()
{
if
(
m
gmt
GetMnodesNumFp
)
{
if
(
m
peer
GetMnodesNumFp
)
{
return
(
*
m
gmt
GetMnodesNumFp
)();
return
(
*
m
peer
GetMnodesNumFp
)();
}
else
{
}
else
{
return
1
;
return
1
;
}
}
}
}
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
{
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
{
if
(
m
gmt
GetNextMnodeFp
)
{
if
(
m
peer
GetNextMnodeFp
)
{
return
(
*
m
gmt
GetNextMnodeFp
)(
pShow
,
pMnode
);
return
(
*
m
peer
GetNextMnodeFp
)(
pShow
,
pMnode
);
}
else
{
}
else
{
if
(
*
pMnode
==
NULL
)
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
&
tsMnodeObj
;
*
pMnode
=
&
tsMnodeObj
;
...
@@ -149,11 +182,11 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
...
@@ -149,11 +182,11 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols
++
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
sdbStatusStr
[(
uint8_t
)
pMnode
->
status
]);
strcpy
(
pWrite
,
mgmtMnodeStatusStr
[
pMnode
->
status
]);
cols
++
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
sdbRoleStr
[(
uint8_t
)
pMnode
->
role
]);
strcpy
(
pWrite
,
mgmtMnodeRoleStr
[
pMnode
->
role
]);
cols
++
;
cols
++
;
tinet_ntoa
(
ipstr
,
pMnode
->
publicIp
);
tinet_ntoa
(
ipstr
,
pMnode
->
publicIp
);
...
@@ -168,7 +201,14 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
...
@@ -168,7 +201,14 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
return
numOfRows
;
return
numOfRows
;
}
}
void
mgmtGetMnodeIpList
(
SRpcIpSet
*
ipSet
)
{
void
mgmtGetMnodePrivateIpList
(
SRpcIpSet
*
ipSet
)
{
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
ip
[
0
]
=
htonl
(
inet_addr
(
tsMasterIp
));
}
void
mgmtGetMnodePublicIpList
(
SRpcIpSet
*
ipSet
)
{
ipSet
->
inUse
=
0
;
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
numOfIps
=
1
;
...
...
src/mnode/src/mgmtNormalTable.c
浏览文件 @
4b7e463b
...
@@ -27,6 +27,7 @@
...
@@ -27,6 +27,7 @@
#include "mgmtDClient.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtNormalTable.h"
#include "mgmtSdb.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
#include "mgmtVgroup.h"
...
@@ -55,7 +56,6 @@ static void mgmtNormalTableActionInit() {
...
@@ -55,7 +56,6 @@ static void mgmtNormalTableActionInit() {
mgmtNormalTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtNormalTableActionUpdate
;
mgmtNormalTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtNormalTableActionUpdate
;
mgmtNormalTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtNormalTableActionEncode
;
mgmtNormalTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtNormalTableActionEncode
;
mgmtNormalTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtNormalTableActionDecode
;
mgmtNormalTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtNormalTableActionDecode
;
mgmtNormalTableActionFp
[
SDB_TYPE_RESET
]
=
mgmtNormalTableActionReset
;
mgmtNormalTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtNormalTableActionDestroy
;
mgmtNormalTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtNormalTableActionDestroy
;
}
}
...
@@ -98,7 +98,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s
...
@@ -98,7 +98,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s
return
NULL
;
return
NULL
;
}
}
if
(
!
sdb
Master
)
{
if
(
!
sdb
IsMaster
()
)
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
!=
pTable
->
sid
)
{
if
(
sid
!=
pTable
->
sid
)
{
mError
(
"sid:%d is not matched from the master:%d"
,
sid
,
pTable
->
sid
);
mError
(
"sid:%d is not matched from the master:%d"
,
sid
,
pTable
->
sid
);
...
@@ -222,7 +222,7 @@ int32_t mgmtInitNormalTables() {
...
@@ -222,7 +222,7 @@ int32_t mgmtInitNormalTables() {
SNormalTableObj
tObj
;
SNormalTableObj
tObj
;
tsNormalTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsNormalTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsNormalTableSdb
=
sdbOpenTable
(
tsMaxTables
,
sizeof
(
SNormalTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
tsNormalTableSdb
=
sdbOpenTable
(
TSDB_MAX_NORMAL_TABLES
,
sizeof
(
SNormalTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
"ntables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtNormalTableAction
);
"ntables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtNormalTableAction
);
if
(
tsNormalTableSdb
==
NULL
)
{
if
(
tsNormalTableSdb
==
NULL
)
{
mError
(
"failed to init ntables data"
);
mError
(
"failed to init ntables data"
);
...
@@ -323,13 +323,6 @@ void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
...
@@ -323,13 +323,6 @@ void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
}
}
void
*
mgmtCreateNormalTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
sid
)
{
void
*
mgmtCreateNormalTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
sid
)
{
int32_t
numOfTables
=
sdbGetNumOfRows
(
tsNormalTableSdb
);
if
(
numOfTables
>=
TSDB_MAX_NORMAL_TABLES
)
{
mError
(
"table:%s, numOfTables:%d exceed maxTables:%d"
,
pCreate
->
tableId
,
numOfTables
,
TSDB_MAX_NORMAL_TABLES
);
terrno
=
TSDB_CODE_TOO_MANY_TABLES
;
return
NULL
;
}
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
calloc
(
sizeof
(
SNormalTableObj
),
1
);
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
calloc
(
sizeof
(
SNormalTableObj
),
1
);
if
(
pTable
==
NULL
)
{
if
(
pTable
==
NULL
)
{
mError
(
"table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
mError
(
"table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
...
@@ -341,7 +334,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
...
@@ -341,7 +334,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
pTable
->
type
=
TSDB_NORMAL_TABLE
;
pTable
->
type
=
TSDB_NORMAL_TABLE
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
uid
=
(((
uint64_t
)
pTable
->
createdTime
)
<<
16
)
+
(
(
uint64_t
)
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pTable
->
uid
=
(((
uint64_t
)
pTable
->
createdTime
)
<<
16
)
+
(
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pTable
->
sid
=
sid
;
pTable
->
sid
=
sid
;
pTable
->
sversion
=
0
;
pTable
->
sversion
=
0
;
pTable
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pTable
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
4b7e463b
...
@@ -15,60 +15,107 @@
...
@@ -15,60 +15,107 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "
sdb
.h"
#include "
taosdef
.h"
#include "tutil.h"
#include "tutil.h"
#include "mgmtSdb.h"
#include "mgmtSdb.h"
#define abs(x) (((x) < 0) ? -(x) : (x))
#define abs(x) (((x) < 0) ? -(x) : (x))
#define SDB_MAX_PEERS 4
#define SDB_DELIMITER 0xFFF00F00
#define SDB_ENDCOMMIT 0xAFFFAAAF
#define SDB_STATUS_OFFLINE 0
#define SDB_STATUS_SERVING 1
typedef
struct
{
uint64_t
swVersion
;
int16_t
sdbFileVersion
;
char
reserved
[
6
];
TSCKSUM
checkSum
;
}
SSdbHeader
;
typedef
struct
_SSdbTable
{
SSdbHeader
header
;
int
maxRows
;
int
dbId
;
int32_t
maxRowSize
;
char
name
[
TSDB_DB_NAME_LEN
];
char
fn
[
128
];
int
keyType
;
uint32_t
autoIndex
;
int64_t
numOfRows
;
int64_t
id
;
int64_t
size
;
void
*
iHandle
;
int
fd
;
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
,
int
*
);
pthread_mutex_t
mutex
;
}
SSdbTable
;
typedef
struct
{
int64_t
id
;
int64_t
offset
;
int
rowSize
;
void
*
row
;
}
SRowMeta
;
typedef
struct
{
int32_t
delimiter
;
int32_t
rowSize
;
int64_t
id
;
char
data
[];
}
SRowHead
;
typedef
struct
{
uint8_t
dbId
;
char
type
;
uint64_t
version
;
short
dataLen
;
char
data
[];
}
SForwardMsg
;
extern
char
version
[];
extern
char
version
[];
const
int16_t
sdbFileVersion
=
0
;
const
int16_t
sdbFileVersion
=
2
;
SRpcIpSet
*
pSdbIpList
=
NULL
;
int32_t
(
*
mpeerForwardRequestFp
)(
SSdbTable
*
pTable
,
char
type
,
void
*
cont
,
int32_t
contLen
)
=
NULL
;
SRpcIpSet
*
pSdbPublicIpList
=
NULL
;
SSdbPeer
*
sdbPeer
[
SDB_MAX_PEERS
];
// first slot for self
#ifdef CLUSTER
int
sdbMaster
=
0
;
#else
int
sdbMaster
=
1
;
#endif
void
*
(
*
sdbInitIndexFp
[])(
int
maxRows
,
int
dataSize
)
=
{
sdbOpenStrHash
,
sdbOpenIntHash
,
sdbOpenIntHash
};
void
*
(
*
sdbAddIndexFp
[])(
void
*
handle
,
void
*
key
,
void
*
data
)
=
{
sdbAddStrHash
,
sdbAddIntHash
,
sdbAddIntHash
};
void
(
*
sdbDeleteIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbDeleteStrHash
,
sdbDeleteIntHash
,
sdbDeleteIntHash
};
void
*
(
*
sdbGetIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbGetStrHashData
,
sdbGetIntHashData
,
sdbGetIntHashData
};
static
SSdbTable
*
sdbTableList
[
10
]
=
{
0
};
static
int32_t
sdbNumOfTables
=
0
;
static
uint64_t
sdbVersion
=
0
;
static
int32_t
sdbMaster
=
0
;
static
int32_t
sdbStatus
=
SDB_STATUS_OFFLINE
;
void
(
*
sdbCleanUpIndexFp
[])(
void
*
handle
)
=
{
sdbCloseStrHash
,
sdbCloseIntHash
,
sdbCloseIntHash
,
};
void
*
(
*
sdbFetchRowFp
[])(
void
*
handle
,
void
*
ptr
,
void
**
ppRow
)
=
{
// #ifdef CLUSTER
sdbFetchStrHashData
,
sdbFetchIntHashData
,
sdbFetchIntHashData
,
// int32_t sdbMaster = 0;
};
// #else
// int32_t sdbMaster = 1;
// #endif
SSdbTable
*
tableList
[
20
];
static
void
*
(
*
sdbInitIndexFp
[])(
int32_t
maxRows
,
int32_t
dataSize
)
=
{
sdbOpenStrHash
,
sdbOpenIntHash
};
int
sdbNumOfTables
;
static
void
*
(
*
sdbAddIndexFp
[])(
void
*
handle
,
void
*
key
,
void
*
data
)
=
{
sdbAddStrHash
,
sdbAddIntHash
};
int64_t
sdbVersion
;
static
void
(
*
sdbDeleteIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbDeleteStrHash
,
sdbDeleteIntHash
};
static
void
*
(
*
sdbGetIndexFp
[])(
void
*
handle
,
void
*
key
)
=
{
sdbGetStrHashData
,
sdbGetIntHashData
};
static
void
(
*
sdbCleanUpIndexFp
[])(
void
*
handle
)
=
{
sdbCloseStrHash
,
sdbCloseIntHash
};
static
void
*
(
*
sdbFetchRowFp
[])(
void
*
handle
,
void
*
ptr
,
void
**
ppRow
)
=
{
sdbFetchStrHashData
,
sdbFetchIntHashData
};
int64_t
sdbGetVersion
()
{
void
sdbResetTable
(
SSdbTable
*
pTable
);
return
sdbVersion
;
void
sdbSaveSnapShot
(
void
*
handle
);
};
int32_t
sdbGetRunStatus
()
{
uint64_t
sdbGetVersion
()
{
return
sdbVersion
;
}
if
(
!
tsIsCluster
)
{
bool
sdbInServerState
()
{
return
sdbStatus
==
SDB_STATUS_SERVING
;
}
return
SDB_STATUS_SERVING
;
bool
sdbIsMaster
()
{
return
sdbMaster
;
}
}
int64_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
id
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
if
(
sdbInited
==
NULL
)
{
static
int32_t
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int32_t
dataLen
)
{
return
SDB_STATUS_OFFLINE
;
if
(
mpeerForwardRequestFp
)
{
return
mpeerForwardRequestFp
(
pTable
,
type
,
data
,
dataLen
);
}
else
{
return
0
;
}
}
return
sdbStatus
;
}
}
void
sdbFinishCommit
(
void
*
handle
)
{
static
void
sdbFinishCommit
(
void
*
handle
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
...
@@ -78,7 +125,7 @@ void sdbFinishCommit(void *handle) {
...
@@ -78,7 +125,7 @@ void sdbFinishCommit(void *handle) {
pTable
->
size
+=
sizeof
(
sdbEcommit
);
pTable
->
size
+=
sizeof
(
sdbEcommit
);
}
}
in
t
sdbOpenSdbFile
(
SSdbTable
*
pTable
)
{
static
int32_
t
sdbOpenSdbFile
(
SSdbTable
*
pTable
)
{
struct
stat
fstat
,
ofstat
;
struct
stat
fstat
,
ofstat
;
uint64_t
size
;
uint64_t
size
;
char
*
dirc
=
NULL
;
char
*
dirc
=
NULL
;
...
@@ -91,7 +138,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
...
@@ -91,7 +138,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
memcpy
(
swVersion
.
cversion
,
version
,
sizeof
(
uint64_t
));
memcpy
(
swVersion
.
cversion
,
version
,
sizeof
(
uint64_t
));
// check sdb.db and .sdb.db status
// check sdb.db and .sdb.db status
char
fn
[
128
]
=
"
\0
"
;
char
fn
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
dirc
=
strdup
(
pTable
->
fn
);
dirc
=
strdup
(
pTable
->
fn
);
basec
=
strdup
(
pTable
->
fn
);
basec
=
strdup
(
pTable
->
fn
);
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
...
@@ -168,27 +215,15 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
...
@@ -168,27 +215,15 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
return
pTable
->
fd
;
return
pTable
->
fd
;
}
}
// TODO: Change here
static
int32_t
sdbInitTableByFile
(
SSdbTable
*
pTable
)
{
void
sdbAddIntoUpdateList
(
SSdbTable
*
pTable
,
char
type
,
char
*
row
)
{
pTable
->
numOfUpdates
++
;
pTable
->
updatePos
=
pTable
->
numOfUpdates
%
pTable
->
maxRows
;
if
(
pTable
->
update
[
pTable
->
updatePos
].
type
==
SDB_TYPE_DELETE
)
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DESTROY
,
pTable
->
update
[
pTable
->
updatePos
].
row
,
NULL
,
0
,
NULL
);
pTable
->
update
[
pTable
->
updatePos
].
type
=
type
;
pTable
->
update
[
pTable
->
updatePos
].
row
=
row
;
}
int
sdbInitTableByFile
(
SSdbTable
*
pTable
)
{
SRowMeta
rowMeta
;
SRowMeta
rowMeta
;
int
numOfDels
=
0
;
int
32_t
numOfDels
=
0
;
int
bytes
=
0
;
int
32_t
bytes
=
0
;
int64_t
oldId
=
0
;
int64_t
oldId
=
0
;
void
*
pMetaRow
=
NULL
;
void
*
pMetaRow
=
NULL
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
int
real_size
=
0
;
int
32_t
real_size
=
0
;
int
maxAutoIndex
=
0
;
int
32_t
maxAutoIndex
=
0
;
oldId
=
pTable
->
id
;
oldId
=
pTable
->
id
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
-
1
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
-
1
;
...
@@ -277,7 +312,7 @@ int sdbInitTableByFile(SSdbTable *pTable) {
...
@@ -277,7 +312,7 @@ int sdbInitTableByFile(SSdbTable *pTable) {
numOfDels
++
;
numOfDels
++
;
}
else
{
// Reset the object TODO: is it possible to merge reset and
}
else
{
// Reset the object TODO: is it possible to merge reset and
// update ??
// update ??
(
*
(
pTable
->
appTool
))(
SDB_TYPE_RESET
,
pMetaRow
,
rowHead
->
data
,
rowHead
->
rowSize
,
NULL
);
//
(*(pTable->appTool))(SDB_TYPE_RESET, pMetaRow, rowHead->data, rowHead->rowSize, NULL);
}
}
numOfDels
++
;
numOfDels
++
;
}
}
...
@@ -293,9 +328,6 @@ int sdbInitTableByFile(SSdbTable *pTable) {
...
@@ -293,9 +328,6 @@ int sdbInitTableByFile(SSdbTable *pTable) {
sdbVersion
+=
(
pTable
->
id
-
oldId
);
sdbVersion
+=
(
pTable
->
id
-
oldId
);
if
(
numOfDels
>
pTable
->
maxRows
/
4
)
sdbSaveSnapShot
(
pTable
);
if
(
numOfDels
>
pTable
->
maxRows
/
4
)
sdbSaveSnapShot
(
pTable
);
pTable
->
numOfUpdates
=
0
;
pTable
->
updatePos
=
0
;
tfree
(
rowHead
);
tfree
(
rowHead
);
return
0
;
return
0
;
...
@@ -304,20 +336,12 @@ sdb_exit1:
...
@@ -304,20 +336,12 @@ sdb_exit1:
return
-
1
;
return
-
1
;
}
}
void
*
sdbOpenTable
(
int
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
void
*
sdbOpenTable
(
int
32_t
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
,
in
t
*
))
{
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
32_t
,
int32_
t
*
))
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
malloc
(
sizeof
(
SSdbTable
));
SSdbTable
*
pTable
=
(
SSdbTable
*
)
malloc
(
sizeof
(
SSdbTable
));
if
(
pTable
==
NULL
)
return
NULL
;
if
(
pTable
==
NULL
)
return
NULL
;
memset
(
pTable
,
0
,
sizeof
(
SSdbTable
));
memset
(
pTable
,
0
,
sizeof
(
SSdbTable
));
int
size
=
sizeof
(
SSdbUpdate
)
*
maxRows
;
pTable
->
update
=
(
SSdbUpdate
*
)
malloc
(
size
);
if
(
pTable
->
update
==
NULL
)
{
free
(
pTable
);
return
NULL
;
};
memset
(
pTable
->
update
,
0
,
size
);
strcpy
(
pTable
->
name
,
name
);
strcpy
(
pTable
->
name
,
name
);
pTable
->
keyType
=
keyType
;
pTable
->
keyType
=
keyType
;
pTable
->
maxRows
=
maxRows
;
pTable
->
maxRows
=
maxRows
;
...
@@ -332,14 +356,14 @@ void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType,
...
@@ -332,14 +356,14 @@ void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType,
if
(
sdbInitTableByFile
(
pTable
)
<
0
)
return
NULL
;
if
(
sdbInitTableByFile
(
pTable
)
<
0
)
return
NULL
;
pTable
->
dbId
=
sdbNumOfTables
++
;
pTable
->
dbId
=
sdbNumOfTables
++
;
t
ableList
[
pTable
->
dbId
]
=
pTable
;
sdbT
ableList
[
pTable
->
dbId
]
=
pTable
;
sdbTrace
(
"table:%s is initialized, numOfRows:%d, numOfTables:%d"
,
pTable
->
name
,
pTable
->
numOfRows
,
sdbNumOfTables
);
sdbTrace
(
"table:%s is initialized, numOfRows:%d, numOfTables:%d"
,
pTable
->
name
,
pTable
->
numOfRows
,
sdbNumOfTables
);
return
pTable
;
return
pTable
;
}
}
SRowMeta
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
static
SRowMeta
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
;
SRowMeta
*
pMeta
;
...
@@ -365,15 +389,14 @@ void *sdbGetRow(void *handle, void *key) {
...
@@ -365,15 +389,14 @@ void *sdbGetRow(void *handle, void *key) {
return
pMeta
->
row
;
return
pMeta
->
row
;
}
}
// row here must be encoded string (rowSize > 0) or the object it self (rowSize
// row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0)
// = 0)
int64_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
int32_t
rowSize
)
{
int64_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
int
rowSize
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
rowMeta
;
SRowMeta
rowMeta
;
int64_t
id
=
-
1
;
int64_t
id
=
-
1
;
void
*
pObj
=
NULL
;
void
*
pObj
=
NULL
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
int
real_size
=
0
;
int
32_t
real_size
=
0
;
/* char action = SDB_TYPE_INSERT; */
/* char action = SDB_TYPE_INSERT; */
if
(
pTable
==
NULL
)
{
if
(
pTable
==
NULL
)
{
...
@@ -398,9 +421,6 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
...
@@ -398,9 +421,6 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
case
SDB_KEYTYPE_STRING
:
case
SDB_KEYTYPE_STRING
:
sdbError
(
"table:%s, failed to insert record:%s sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
sdbError
(
"table:%s, failed to insert record:%s sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
//dnodes or mnodes
sdbError
(
"table:%s, failed to insert record:%s sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
taosIpStr
(
*
(
int32_t
*
)
row
),
sdbVersion
,
pTable
->
id
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
sdbError
(
"table:%s, failed to insert record:%d sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
sdbError
(
"table:%s, failed to insert record:%d sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
break
;
...
@@ -464,18 +484,12 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
...
@@ -464,18 +484,12 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
pTable
->
size
+=
real_size
;
pTable
->
size
+=
real_size
;
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
sdbAddIntoUpdateList
(
pTable
,
SDB_TYPE_INSERT
,
rowMeta
.
row
);
pTable
->
numOfRows
++
;
pTable
->
numOfRows
++
;
switch
(
pTable
->
keyType
)
{
switch
(
pTable
->
keyType
)
{
case
SDB_KEYTYPE_STRING
:
case
SDB_KEYTYPE_STRING
:
sdbTrace
(
"table:%s, a record is inserted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
sdbTrace
(
"table:%s, a record is inserted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
//dnodes or mnodes
sdbTrace
(
"table:%s, a record is inserted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
taosIpStr
(
*
(
int32_t
*
)
row
),
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is inserted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
sdbTrace
(
"table:%s, a record is inserted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
...
@@ -502,14 +516,14 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
...
@@ -502,14 +516,14 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
}
}
// row here can be object or null-terminated string
// row here can be object or null-terminated string
int
sdbDeleteRow
(
void
*
handle
,
void
*
row
)
{
int
32_t
sdbDeleteRow
(
void
*
handle
,
void
*
row
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
=
NULL
;
SRowMeta
*
pMeta
=
NULL
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
void
*
pMetaRow
=
NULL
;
void
*
pMetaRow
=
NULL
;
SRowHead
*
rowHead
=
NULL
;
SRowHead
*
rowHead
=
NULL
;
int
rowSize
=
0
;
int
32_t
rowSize
=
0
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
/* char action = SDB_TYPE_DELETE; */
/* char action = SDB_TYPE_DELETE; */
if
(
pTable
==
NULL
)
return
-
1
;
if
(
pTable
==
NULL
)
return
-
1
;
...
@@ -527,9 +541,6 @@ int sdbDeleteRow(void *handle, void *row) {
...
@@ -527,9 +541,6 @@ int sdbDeleteRow(void *handle, void *row) {
case
SDB_KEYTYPE_STRING
:
case
SDB_KEYTYPE_STRING
:
rowSize
=
strlen
((
char
*
)
row
)
+
1
;
rowSize
=
strlen
((
char
*
)
row
)
+
1
;
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
rowSize
=
sizeof
(
uint32_t
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
rowSize
=
sizeof
(
uint64_t
);
rowSize
=
sizeof
(
uint64_t
);
break
;
break
;
...
@@ -568,17 +579,12 @@ int sdbDeleteRow(void *handle, void *row) {
...
@@ -568,17 +579,12 @@ int sdbDeleteRow(void *handle, void *row) {
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
pTable
->
numOfRows
--
;
pTable
->
numOfRows
--
;
// TODO:Change the update list here
sdbAddIntoUpdateList
(
pTable
,
SDB_TYPE_DELETE
,
pMetaRow
);
switch
(
pTable
->
keyType
)
{
switch
(
pTable
->
keyType
)
{
case
SDB_KEYTYPE_STRING
:
case
SDB_KEYTYPE_STRING
:
sdbTrace
(
"table:%s, a record is deleted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
sdbTrace
(
"table:%s, a record is deleted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
//dnodes or mnodes
sdbTrace
(
"table:%s, a record is deleted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
taosIpStr
(
*
(
int32_t
*
)
row
),
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is deleted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
sdbTrace
(
"table:%s, a record is deleted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
...
@@ -606,12 +612,12 @@ int sdbDeleteRow(void *handle, void *row) {
...
@@ -606,12 +612,12 @@ int sdbDeleteRow(void *handle, void *row) {
}
}
// row here can be the object or the string info (encoded string)
// row here can be the object or the string info (encoded string)
int
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
in
t
updateSize
,
char
isUpdated
)
{
int
32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_
t
updateSize
,
char
isUpdated
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
=
NULL
;
SRowMeta
*
pMeta
=
NULL
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
int
real_size
=
0
;
int
32_t
real_size
=
0
;
/* char action = SDB_TYPE_UPDATE; */
/* char action = SDB_TYPE_UPDATE; */
if
(
pTable
==
NULL
||
row
==
NULL
)
return
-
1
;
if
(
pTable
==
NULL
||
row
==
NULL
)
return
-
1
;
...
@@ -622,10 +628,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
...
@@ -622,10 +628,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
sdbError
(
"table:%s, failed to update record:%s, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
sdbError
(
"table:%s, failed to update record:%s, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
//dnodes or mnodes
sdbError
(
"table:%s, failed to update record:%s, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
taosIpStr
(
*
(
int32_t
*
)
row
),
sdbVersion
,
pTable
->
id
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
sdbError
(
"table:%s, failed to update record:%d, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
sdbError
(
"table:%s, failed to update record:%d, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
...
@@ -694,10 +696,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
...
@@ -694,10 +696,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
sdbTrace
(
"table:%s, a record is updated:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
sdbTrace
(
"table:%s, a record is updated:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
break
;
case
SDB_KEYTYPE_UINT32
:
//dnodes or mnodes
sdbTrace
(
"table:%s, a record is updated:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
taosIpStr
(
*
(
int32_t
*
)
row
),
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
case
SDB_KEYTYPE_AUTO
:
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is updated:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
sdbTrace
(
"table:%s, a record is updated:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
...
@@ -708,7 +706,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
...
@@ -708,7 +706,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
break
;
break
;
}
}
sdbAddIntoUpdateList
(
pTable
,
SDB_TYPE_UPDATE
,
pMetaRow
);
code
=
0
;
code
=
0
;
}
}
...
@@ -719,79 +716,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
...
@@ -719,79 +716,6 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
return
code
;
return
code
;
}
}
// row here must be the instruction string
int
sdbBatchUpdateRow
(
void
*
handle
,
void
*
row
,
int
rowSize
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
=
NULL
;
int
total_size
=
0
;
/* char action = SDB_TYPE_BATCH_UPDATE; */
if
(
pTable
==
NULL
||
row
==
NULL
||
rowSize
<=
0
)
return
-
1
;
pMeta
=
sdbGetRowMeta
(
handle
,
row
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, batch update failed"
,
pTable
->
name
);
return
-
1
;
}
void
*
pMetaRow
=
pMeta
->
row
;
assert
(
pMetaRow
!=
NULL
);
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
if
(
rowHead
==
NULL
)
{
sdbError
(
"failed to allocate row head memory, sdb:%s"
,
pTable
->
name
);
return
-
1
;
}
pthread_mutex_lock
(
&
pTable
->
mutex
);
if
(
sdbForwardDbReqToPeer
(
pTable
,
SDB_TYPE_BATCH_UPDATE
,
row
,
rowSize
)
==
0
)
{
/* // write action */
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
(
*
(
pTable
->
appTool
))(
SDB_TYPE_BEFORE_BATCH_UPDATE
,
pMetaRow
,
NULL
,
0
,
NULL
);
void
*
next_row
=
pMetaRow
;
while
(
next_row
!=
NULL
)
{
pTable
->
id
++
;
sdbVersion
++
;
void
*
last_row
=
next_row
;
next_row
=
(
*
(
pTable
->
appTool
))(
SDB_TYPE_BATCH_UPDATE
,
last_row
,
(
char
*
)
row
,
rowSize
,
0
);
memset
(
rowHead
,
0
,
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
));
// update in current layer
pMeta
->
id
=
pTable
->
id
;
pMeta
->
offset
=
pTable
->
size
;
// write to disk
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
id
=
pMeta
->
id
;
(
*
(
pTable
->
appTool
))(
SDB_TYPE_ENCODE
,
last_row
,
rowHead
->
data
,
pTable
->
maxRowSize
,
&
(
rowHead
->
rowSize
));
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
));
pMeta
->
rowSize
=
rowHead
->
rowSize
;
lseek
(
pTable
->
fd
,
pTable
->
size
,
SEEK_SET
);
twrite
(
pTable
->
fd
,
rowHead
,
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
));
pTable
->
size
+=
(
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
));
sdbAddIntoUpdateList
(
pTable
,
SDB_TYPE_UPDATE
,
last_row
);
if
(
next_row
!=
NULL
)
{
pMeta
=
sdbGetRowMeta
(
handle
,
next_row
);
}
}
sdbFinishCommit
(
pTable
);
(
*
(
pTable
->
appTool
))(
SDB_TYPE_AFTER_BATCH_UPDATE
,
pMetaRow
,
NULL
,
0
,
NULL
);
}
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
return
0
;
}
void
sdbCloseTable
(
void
*
handle
)
{
void
sdbCloseTable
(
void
*
handle
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
void
*
pNode
=
NULL
;
void
*
pNode
=
NULL
;
...
@@ -814,20 +738,19 @@ void sdbCloseTable(void *handle) {
...
@@ -814,20 +738,19 @@ void sdbCloseTable(void *handle) {
sdbNumOfTables
--
;
sdbNumOfTables
--
;
sdbTrace
(
"table:%s is closed, id:%"
PRId64
" numOfTables:%d"
,
pTable
->
name
,
pTable
->
id
,
sdbNumOfTables
);
sdbTrace
(
"table:%s is closed, id:%"
PRId64
" numOfTables:%d"
,
pTable
->
name
,
pTable
->
id
,
sdbNumOfTables
);
tfree
(
pTable
->
update
);
tfree
(
pTable
);
tfree
(
pTable
);
}
}
void
sdbResetTable
(
SSdbTable
*
pTable
)
{
void
sdbResetTable
(
SSdbTable
*
pTable
)
{
/* SRowHead rowHead; */
/* SRowHead rowHead; */
SRowMeta
rowMeta
;
SRowMeta
rowMeta
;
int
bytes
;
int
32_t
bytes
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
int
real_size
=
0
;
int
32_t
real_size
=
0
;
SRowHead
*
rowHead
=
NULL
;
SRowHead
*
rowHead
=
NULL
;
void
*
pMetaRow
=
NULL
;
void
*
pMetaRow
=
NULL
;
int64_t
oldId
=
pTable
->
id
;
int64_t
oldId
=
pTable
->
id
;
int
oldNumOfRows
=
pTable
->
numOfRows
;
int
32_t
oldNumOfRows
=
pTable
->
numOfRows
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
;
pTable
->
numOfRows
=
oldNumOfRows
;
pTable
->
numOfRows
=
oldNumOfRows
;
...
@@ -911,24 +834,21 @@ void sdbResetTable(SSdbTable *pTable) {
...
@@ -911,24 +834,21 @@ void sdbResetTable(SSdbTable *pTable) {
}
}
sdbVersion
+=
(
pTable
->
id
-
oldId
);
sdbVersion
+=
(
pTable
->
id
-
oldId
);
pTable
->
numOfUpdates
=
0
;
pTable
->
updatePos
=
0
;
tfree
(
rowHead
);
tfree
(
rowHead
);
sdbPrint
(
"table:%s is updated, sdbVerion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
);
sdbPrint
(
"table:%s is updated, sdbVerion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
);
}
}
// TODO:A problem here :use snapshot file to sync another node will cause
// TODO:A problem here :use snapshot file to sync another node will cause problem
// problem
void
sdbSaveSnapShot
(
void
*
handle
)
{
void
sdbSaveSnapShot
(
void
*
handle
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
;
SRowMeta
*
pMeta
;
void
*
pNode
=
NULL
;
void
*
pNode
=
NULL
;
int
total_size
=
0
;
int
32_t
total_size
=
0
;
int
real_size
=
0
;
int
32_t
real_size
=
0
;
int
size
=
0
;
int
32_t
size
=
0
;
int
numOfRows
=
0
;
int
32_t
numOfRows
=
0
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
char
*
dirc
=
NULL
;
char
*
dirc
=
NULL
;
char
*
basec
=
NULL
;
char
*
basec
=
NULL
;
...
@@ -942,7 +862,7 @@ void sdbSaveSnapShot(void *handle) {
...
@@ -942,7 +862,7 @@ void sdbSaveSnapShot(void *handle) {
dirc
=
strdup
(
pTable
->
fn
);
dirc
=
strdup
(
pTable
->
fn
);
basec
=
strdup
(
pTable
->
fn
);
basec
=
strdup
(
pTable
->
fn
);
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
int
fd
=
open
(
fn
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
int
32_t
fd
=
open
(
fn
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
tfree
(
dirc
);
tfree
(
dirc
);
tfree
(
basec
);
tfree
(
basec
);
...
@@ -1011,50 +931,3 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
...
@@ -1011,50 +931,3 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
return
pNode
;
return
pNode
;
}
}
int64_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
id
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
int32_t
(
*
mpeerInitMnodesFp
)(
char
*
directory
)
=
NULL
;
void
(
*
mpeerCleanUpMnodesFp
)()
=
NULL
;
int32_t
(
*
mpeerForwardRequestFp
)(
SSdbTable
*
pTable
,
char
type
,
void
*
cont
,
int32_t
contLen
)
=
NULL
;
char
*
sdbStatusStr
[]
=
{
"offline"
,
"unsynced"
,
"syncing"
,
"serving"
,
"null"
};
char
*
sdbRoleStr
[]
=
{
"unauthed"
,
"undecided"
,
"master"
,
"slave"
,
"null"
};
int32_t
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int32_t
dataLen
)
{
if
(
mpeerForwardRequestFp
)
{
return
mpeerForwardRequestFp
(
pTable
,
type
,
data
,
dataLen
);
}
else
{
return
0
;
}
}
int32_t
sdbInitPeers
(
char
*
directory
)
{
if
(
mpeerInitMnodesFp
)
{
return
(
*
mpeerInitMnodesFp
)(
directory
);
}
else
{
return
0
;
}
}
void
sdbCleanUpPeers
()
{
if
(
mpeerCleanUpMnodesFp
)
{
(
*
mpeerCleanUpMnodesFp
)();
}
}
src/mnode/src/mgmtShell.c
浏览文件 @
4b7e463b
...
@@ -32,6 +32,7 @@
...
@@ -32,6 +32,7 @@
#include "mgmtMnode.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
...
@@ -63,7 +64,7 @@ int32_t mgmtInitShell() {
...
@@ -63,7 +64,7 @@ int32_t mgmtInitShell() {
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
tsMgmtTranQhandle
=
taosInitScheduler
(
tsMax
Dnodes
+
tsMax
ShellConns
,
1
,
"mnodeT"
);
tsMgmtTranQhandle
=
taosInitScheduler
(
tsMaxShellConns
,
1
,
"mnodeT"
);
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
4
.
0
;
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
4
.
0
;
if
(
numOfThreads
<
1
)
{
if
(
numOfThreads
<
1
)
{
...
@@ -131,7 +132,7 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
...
@@ -131,7 +132,7 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
}
}
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
rpcMsg
)
{
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
rpcMsg
)
{
if
(
sdbGetRunStatus
()
!=
SDB_STATUS_SERVING
)
{
if
(
!
sdbInServerState
()
)
{
mgmtProcessMsgWhileNotReady
(
rpcMsg
);
mgmtProcessMsgWhileNotReady
(
rpcMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
return
;
...
@@ -309,20 +310,10 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
...
@@ -309,20 +310,10 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
return
;
return
;
}
}
pHBRsp
->
ipList
.
inUse
=
0
;
pHBRsp
->
ipList
.
port
=
htons
(
tsMnodeShellPort
);
pHBRsp
->
ipList
.
numOfIps
=
0
;
if
(
pSdbPublicIpList
!=
NULL
&&
pSdbIpList
!=
NULL
)
{
pHBRsp
->
ipList
.
numOfIps
=
htons
(
pSdbPublicIpList
->
numOfIps
);
if
(
connInfo
.
serverIp
==
tsPublicIpInt
)
{
if
(
connInfo
.
serverIp
==
tsPublicIpInt
)
{
for
(
int
i
=
0
;
i
<
pSdbPublicIpList
->
numOfIps
;
++
i
)
{
mgmtGetMnodePublicIpList
(
&
pHBRsp
->
ipList
);
pHBRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pSdbPublicIpList
->
ip
[
i
]);
}
}
else
{
}
else
{
for
(
int
i
=
0
;
i
<
pSdbIpList
->
numOfIps
;
++
i
)
{
mgmtGetMnodePrivateIpList
(
&
pHBRsp
->
ipList
);
pHBRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pSdbIpList
->
ip
[
i
]);
}
}
}
}
/*
/*
...
@@ -411,20 +402,11 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
...
@@ -411,20 +402,11 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
strcpy
(
pConnectRsp
->
serverVersion
,
version
);
strcpy
(
pConnectRsp
->
serverVersion
,
version
);
pConnectRsp
->
writeAuth
=
pUser
->
writeAuth
;
pConnectRsp
->
writeAuth
=
pUser
->
writeAuth
;
pConnectRsp
->
superAuth
=
pUser
->
superAuth
;
pConnectRsp
->
superAuth
=
pUser
->
superAuth
;
pConnectRsp
->
ipList
.
inUse
=
0
;
pConnectRsp
->
ipList
.
port
=
htons
(
tsMnodeShellPort
);
pConnectRsp
->
ipList
.
numOfIps
=
0
;
if
(
pSdbPublicIpList
!=
NULL
&&
pSdbIpList
!=
NULL
)
{
pConnectRsp
->
ipList
.
numOfIps
=
htons
(
pSdbPublicIpList
->
numOfIps
);
if
(
connInfo
.
serverIp
==
tsPublicIpInt
)
{
if
(
connInfo
.
serverIp
==
tsPublicIpInt
)
{
for
(
int
i
=
0
;
i
<
pSdbPublicIpList
->
numOfIps
;
++
i
)
{
mgmtGetMnodePublicIpList
(
&
pConnectRsp
->
ipList
);
pConnectRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pSdbPublicIpList
->
ip
[
i
]);
}
}
else
{
}
else
{
for
(
int
i
=
0
;
i
<
pSdbIpList
->
numOfIps
;
++
i
)
{
mgmtGetMnodePrivateIpList
(
&
pConnectRsp
->
ipList
);
pConnectRsp
->
ipList
.
ip
[
i
]
=
htonl
(
pSdbIpList
->
ip
[
i
]);
}
}
}
}
connect_over:
connect_over:
...
...
src/mnode/src/mgmtSuperTable.c
浏览文件 @
4b7e463b
...
@@ -31,6 +31,7 @@
...
@@ -31,6 +31,7 @@
#include "mgmtGrant.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtSdb.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "mgmtVgroup.h"
...
@@ -63,7 +64,6 @@ static void mgmtSuperTableActionInit() {
...
@@ -63,7 +64,6 @@ static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtSuperTableActionUpdate
;
mgmtSuperTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtSuperTableActionUpdate
;
mgmtSuperTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtSuperTableActionEncode
;
mgmtSuperTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtSuperTableActionEncode
;
mgmtSuperTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtSuperTableActionDecode
;
mgmtSuperTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtSuperTableActionDecode
;
mgmtSuperTableActionFp
[
SDB_TYPE_RESET
]
=
mgmtSuperTableActionReset
;
mgmtSuperTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtSuperTableActionDestroy
;
mgmtSuperTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtSuperTableActionDestroy
;
}
}
...
@@ -164,7 +164,7 @@ int32_t mgmtInitSuperTables() {
...
@@ -164,7 +164,7 @@ int32_t mgmtInitSuperTables() {
mgmtSuperTableActionInit
();
mgmtSuperTableActionInit
();
tsSuperTableSdb
=
sdbOpenTable
(
tsMaxTables
,
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
tsSuperTableSdb
=
sdbOpenTable
(
TSDB_MAX_SUPER_TABLES
,
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
"stables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtSuperTableAction
);
"stables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtSuperTableAction
);
if
(
tsSuperTableSdb
==
NULL
)
{
if
(
tsSuperTableSdb
==
NULL
)
{
mError
(
"failed to init stables data"
);
mError
(
"failed to init stables data"
);
...
@@ -201,12 +201,6 @@ void mgmtCleanUpSuperTables() {
...
@@ -201,12 +201,6 @@ void mgmtCleanUpSuperTables() {
}
}
int32_t
mgmtCreateSuperTable
(
SCMCreateTableMsg
*
pCreate
)
{
int32_t
mgmtCreateSuperTable
(
SCMCreateTableMsg
*
pCreate
)
{
int32_t
numOfTables
=
sdbGetNumOfRows
(
tsSuperTableSdb
);
if
(
numOfTables
>=
TSDB_MAX_SUPER_TABLES
)
{
mError
(
"stable:%s, numOfTables:%d exceed maxTables:%d"
,
pCreate
->
tableId
,
numOfTables
,
TSDB_MAX_SUPER_TABLES
);
return
TSDB_CODE_TOO_MANY_TABLES
;
}
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
sizeof
(
SSuperTableObj
),
1
);
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
sizeof
(
SSuperTableObj
),
1
);
if
(
pStable
==
NULL
)
{
if
(
pStable
==
NULL
)
{
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
@@ -217,7 +211,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
...
@@ -217,7 +211,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
pStable
->
createdTime
=
taosGetTimestampMs
();
pStable
->
createdTime
=
taosGetTimestampMs
();
pStable
->
vgId
=
0
;
pStable
->
vgId
=
0
;
pStable
->
sid
=
0
;
pStable
->
sid
=
0
;
pStable
->
uid
=
(((
uint64_t
)
pStable
->
createdTime
)
<<
16
)
+
(
(
uint64_t
)
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pStable
->
uid
=
(((
uint64_t
)
pStable
->
createdTime
)
<<
16
)
+
(
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
pStable
->
sversion
=
0
;
pStable
->
sversion
=
0
;
pStable
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pStable
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pStable
->
numOfTags
=
htons
(
pCreate
->
numOfTags
);
pStable
->
numOfTags
=
htons
(
pCreate
->
numOfTags
);
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
4b7e463b
...
@@ -24,6 +24,7 @@
...
@@ -24,6 +24,7 @@
#include "mgmtMnode.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtSuperTable.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
4b7e463b
...
@@ -21,6 +21,7 @@
...
@@ -21,6 +21,7 @@
#include "mgmtAcct.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
#include "mgmtUser.h"
...
@@ -59,7 +60,7 @@ int32_t mgmtInitUsers() {
...
@@ -59,7 +60,7 @@ int32_t mgmtInitUsers() {
SUserObj
tObj
;
SUserObj
tObj
;
tsUserUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsUserUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsUserSdb
=
sdbOpenTable
(
tsMaxUsers
,
tsUserUpdateSize
,
"users"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtUserAction
);
tsUserSdb
=
sdbOpenTable
(
TSDB_MAX_USERS
,
tsUserUpdateSize
,
"users"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtUserAction
);
if
(
tsUserSdb
==
NULL
)
{
if
(
tsUserSdb
==
NULL
)
{
mError
(
"failed to init user data"
);
mError
(
"failed to init user data"
);
return
-
1
;
return
-
1
;
...
@@ -106,12 +107,6 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
...
@@ -106,12 +107,6 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
}
}
static
int32_t
mgmtCreateUser
(
SAcctObj
*
pAcct
,
char
*
name
,
char
*
pass
)
{
static
int32_t
mgmtCreateUser
(
SAcctObj
*
pAcct
,
char
*
name
,
char
*
pass
)
{
int32_t
numOfUsers
=
sdbGetNumOfRows
(
tsUserSdb
);
if
(
numOfUsers
>=
tsMaxUsers
)
{
mWarn
(
"numOfUsers:%d, exceed tsMaxUsers:%d"
,
numOfUsers
,
tsMaxUsers
);
return
TSDB_CODE_TOO_MANY_USERS
;
}
int32_t
code
=
mgmtCheckUserLimit
(
pAcct
);
int32_t
code
=
mgmtCheckUserLimit
(
pAcct
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
code
;
return
code
;
...
@@ -257,7 +252,6 @@ static void mgmtUserActionInit() {
...
@@ -257,7 +252,6 @@ static void mgmtUserActionInit() {
mgmtUserActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtUserActionUpdate
;
mgmtUserActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtUserActionUpdate
;
mgmtUserActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtUserActionEncode
;
mgmtUserActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtUserActionEncode
;
mgmtUserActionFp
[
SDB_TYPE_DECODE
]
=
mgmtUserActionDecode
;
mgmtUserActionFp
[
SDB_TYPE_DECODE
]
=
mgmtUserActionDecode
;
mgmtUserActionFp
[
SDB_TYPE_RESET
]
=
mgmtUserActionReset
;
mgmtUserActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtUserActionDestroy
;
mgmtUserActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtUserActionDestroy
;
}
}
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
4b7e463b
...
@@ -25,6 +25,7 @@
...
@@ -25,6 +25,7 @@
#include "mgmtDClient.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDnode.h"
#include "mgmtProfile.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
#include "mgmtVgroup.h"
...
@@ -58,7 +59,6 @@ static void mgmtVgroupActionInit() {
...
@@ -58,7 +59,6 @@ static void mgmtVgroupActionInit() {
mgmtVgroupActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtVgroupActionUpdate
;
mgmtVgroupActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtVgroupActionUpdate
;
mgmtVgroupActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtVgroupActionEncode
;
mgmtVgroupActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtVgroupActionEncode
;
mgmtVgroupActionFp
[
SDB_TYPE_DECODE
]
=
mgmtVgroupActionDecode
;
mgmtVgroupActionFp
[
SDB_TYPE_DECODE
]
=
mgmtVgroupActionDecode
;
mgmtVgroupActionFp
[
SDB_TYPE_RESET
]
=
mgmtVgroupActionReset
;
mgmtVgroupActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtVgroupActionDestroy
;
mgmtVgroupActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtVgroupActionDestroy
;
}
}
...
@@ -75,7 +75,7 @@ int32_t mgmtInitVgroups() {
...
@@ -75,7 +75,7 @@ int32_t mgmtInitVgroups() {
mgmtVgroupActionInit
();
mgmtVgroupActionInit
();
tsVgroupSdb
=
sdbOpenTable
(
tsMaxVGroups
,
tsVgUpdateSize
,
"vgroups"
,
SDB_KEYTYPE_AUTO
,
tsMnodeDir
,
mgmtVgroupAction
);
tsVgroupSdb
=
sdbOpenTable
(
TSDB_MAX_VGROUPS
,
tsVgUpdateSize
,
"vgroups"
,
SDB_KEYTYPE_AUTO
,
tsMnodeDir
,
mgmtVgroupAction
);
if
(
tsVgroupSdb
==
NULL
)
{
if
(
tsVgroupSdb
==
NULL
)
{
mError
(
"failed to init vgroups data"
);
mError
(
"failed to init vgroups data"
);
return
-
1
;
return
-
1
;
...
...
src/util/inc/tglobalcfg.h
浏览文件 @
4b7e463b
...
@@ -103,13 +103,7 @@ extern int tsReplications;
...
@@ -103,13 +103,7 @@ extern int tsReplications;
extern
int
tsNumOfMPeers
;
extern
int
tsNumOfMPeers
;
extern
int
tsMaxShellConns
;
extern
int
tsMaxShellConns
;
extern
int
tsMaxAccounts
;
extern
int
tsMaxUsers
;
extern
int
tsMaxDbs
;
extern
int
tsMaxTables
;
extern
int
tsMaxTables
;
extern
int
tsMaxDnodes
;
extern
int
tsMaxVGroups
;
extern
char
tsMgmtZone
[];
extern
char
tsLocalIp
[];
extern
char
tsLocalIp
[];
extern
char
tsDefaultDB
[];
extern
char
tsDefaultDB
[];
...
...
src/util/inc/tlog.h
浏览文件 @
4b7e463b
...
@@ -239,6 +239,25 @@ extern uint32_t cdebugFlag;
...
@@ -239,6 +239,25 @@ extern uint32_t cdebugFlag;
#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__)
#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__)
#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__)
#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__)
#define sdbError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-SDB ", 255, __VA_ARGS__); \
}
#define sdbWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
src/util/src/tglobalcfg.c
浏览文件 @
4b7e463b
...
@@ -112,13 +112,7 @@ int tsReplications = TSDB_REPLICA_MIN_NUM;
...
@@ -112,13 +112,7 @@ int tsReplications = TSDB_REPLICA_MIN_NUM;
int
tsNumOfMPeers
=
3
;
int
tsNumOfMPeers
=
3
;
int
tsMaxShellConns
=
2000
;
int
tsMaxShellConns
=
2000
;
int
tsMaxAccounts
=
100
;
int
tsMaxTables
=
100000
;
int
tsMaxUsers
=
1000
;
int
tsMaxDbs
=
1000
;
int
tsMaxTables
=
650000
;
int
tsMaxDnodes
=
1000
;
int
tsMaxVGroups
=
1000
;
char
tsMgmtZone
[
16
]
=
"rzone"
;
char
tsLocalIp
[
TSDB_IPv4ADDR_LEN
]
=
{
0
};
char
tsLocalIp
[
TSDB_IPv4ADDR_LEN
]
=
{
0
};
char
tsDefaultDB
[
TSDB_DB_NAME_LEN
]
=
{
0
};
char
tsDefaultDB
[
TSDB_DB_NAME_LEN
]
=
{
0
};
...
@@ -612,27 +606,9 @@ static void doInitGlobalConfig() {
...
@@ -612,27 +606,9 @@ static void doInitGlobalConfig() {
1
,
8640000
,
0
,
TSDB_CFG_UTYPE_SECOND
);
1
,
8640000
,
0
,
TSDB_CFG_UTYPE_SECOND
);
// mgmt configs
// mgmt configs
tsInitConfigOption
(
cfg
++
,
"mgmtZone"
,
tsMgmtZone
,
TSDB_CFG_VTYPE_STRING
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLUSTER
,
0
,
0
,
16
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxAccounts"
,
&
tsMaxAccounts
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLUSTER
,
1
,
1000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxUsers"
,
&
tsMaxUsers
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
1
,
1000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxDbs"
,
&
tsMaxDbs
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
1
,
10000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxTables"
,
&
tsMaxTables
,
TSDB_CFG_VTYPE_INT
,
tsInitConfigOption
(
cfg
++
,
"maxTables"
,
&
tsMaxTables
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
1
,
100000000
,
0
,
TSDB_CFG_UTYPE_NONE
);
1
,
100000000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxDnodes"
,
&
tsMaxDnodes
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLUSTER
,
1
,
1000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"maxVGroups"
,
&
tsMaxVGroups
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
1
,
1000000
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"minSlidingTime"
,
&
tsMinSlidingTime
,
TSDB_CFG_VTYPE_INT
,
tsInitConfigOption
(
cfg
++
,
"minSlidingTime"
,
&
tsMinSlidingTime
,
TSDB_CFG_VTYPE_INT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录