Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
eb6764be
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eb6764be
编写于
3月 20, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-15] refactor sdb
上级
e9a07987
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
445 addition
and
582 deletion
+445
-582
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+26
-19
src/mnode/src/mgmtChildTable.c
src/mnode/src/mgmtChildTable.c
+42
-68
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+61
-84
src/mnode/src/mgmtNormalTable.c
src/mnode/src/mgmtNormalTable.c
+59
-84
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+63
-53
src/mnode/src/mgmtSuperTable.c
src/mnode/src/mgmtSuperTable.c
+51
-77
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+62
-89
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+81
-108
未找到文件。
src/mnode/inc/mgmtSdb.h
浏览文件 @
eb6764be
...
...
@@ -20,42 +20,49 @@
extern
"C"
{
#endif
enum
_keytype
{
typedef
enum
{
SDB_KEYTYPE_STRING
,
SDB_KEYTYPE_AUTO
,
SDB_KEYTYPE_MAX
}
ESdbKeyType
;
enum
_sdbaction
{
SDB_TYPE_INSERT
,
SDB_TYPE_DELETE
,
SDB_TYPE_UPDATE
,
SDB_TYPE_DECODE
,
SDB_TYPE_ENCODE
,
SDB_TYPE_DESTROY
,
SDB_MAX_ACTION_TYPES
}
ESdbType
;
typedef
enum
{
SDB_OPER_GLOBAL
,
SDB_OPER_LOCAL
,
SDB_OPER_DISK
}
ESdbOper
;
}
ESdbOper
Type
;
uint64_t
sdbGetVersion
();
enum
_sdbaction
{
SDB_TYPE_INSERT
,
SDB_TYPE_DELETE
,
SDB_TYPE_UPDATE
,
}
ESdbForwardType
;
typedef
struct
{
char
*
tableName
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
ESdbKeyType
keyType
;
int32_t
(
*
insertFp
)(
void
*
pObj
);
int32_t
(
*
deleteFp
)(
void
*
pObj
);
int32_t
(
*
updateFp
)(
void
*
pObj
);
int32_t
(
*
encodeFp
)(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
);
void
*
(
*
decodeFp
)(
void
*
pData
);
int32_t
(
*
destroyFp
)(
void
*
pObj
);
}
SSdbTableDesc
;
void
*
sdbOpenTable
(
int32_t
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int32_t
,
int32_t
*
));
void
*
sdbOpenTable
(
SSdbTableDesc
*
desc
);
void
sdbCloseTable
(
void
*
handle
);
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOperType
oper
);
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
key
,
ESdbOperType
oper
);
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
rowSize
,
ESdbOperType
oper
);
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
int64_t
sdbGetId
(
void
*
handle
);
int64_t
sdbGetNumOfRows
(
void
*
handle
);
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOper
oper
);
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
key
,
ESdbOper
oper
);
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
rowSize
,
ESdbOper
oper
);
uint64_t
sdbGetVersion
();
#ifdef __cplusplus
}
...
...
src/mnode/src/mgmtChildTable.c
浏览文件 @
eb6764be
...
...
@@ -39,67 +39,43 @@
void
*
tsChildTableSdb
;
int32_t
tsChildTableUpdateSize
;
void
*
(
*
mgmtChildTableActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtChildTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
mgmtDestroyChildTable
(
SChildTableObj
*
pTable
)
{
free
(
pTable
);
}
static
void
mgmtChildTableActionInit
()
{
mgmtChildTableActionFp
[
SDB_TYPE_INSERT
]
=
mgmtChildTableActionInsert
;
mgmtChildTableActionFp
[
SDB_TYPE_DELETE
]
=
mgmtChildTableActionDelete
;
mgmtChildTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtChildTableActionUpdate
;
mgmtChildTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtChildTableActionEncode
;
mgmtChildTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtChildTableActionDecode
;
mgmtChildTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtChildTableActionDestroy
;
tfree
(
pTable
);
}
void
*
mgmtChildTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
row
;
memcpy
(
pTable
,
str
,
tsChildTableUpdateSize
);
return
NULL
;
}
void
*
mgmtChildTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
row
;
static
int32_t
mgmtChildTableActionDestroy
(
void
*
pObj
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pObj
;
mgmtDestroyChildTable
(
pTable
);
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtChildTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
row
;
static
int32_t
mgmtChildTableActionInsert
(
void
*
pObj
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pObj
;
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, not in vgroup:%d"
,
pTable
->
tableId
,
pTable
->
vgId
);
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vgroup:%d not in db:%s"
,
pTable
->
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
return
TSDB_CODE_INVALID_DB
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
tableId
,
pDb
->
cfg
.
acct
);
return
NULL
;
return
TSDB_CODE_INVALID_ACCT
;
}
if
(
!
mgmtIsMaster
())
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
!=
pTable
->
sid
)
{
mError
(
"ctable:%s, sid:%d is not matched from the master:%d"
,
pTable
->
tableId
,
sid
,
pTable
->
sid
);
return
NULL
;
return
TSDB_CODE_INVALID_SESSION_ID
;
}
}
...
...
@@ -114,30 +90,30 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
mgmtMoveVgroupToTail
(
pDb
,
pVgroup
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtChildTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
row
;
static
int32_t
mgmtChildTableActionDelete
(
void
*
pObj
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pObj
;
if
(
pTable
->
vgId
==
0
)
{
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vgroup:%d not in DB:%s"
,
pTable
->
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
return
TSDB_CODE_INVALID_DB
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
tableId
,
pDb
->
cfg
.
acct
);
return
NULL
;
return
TSDB_CODE_INVALID_ACCT
;
}
mgmtRestoreTimeSeries
(
pAcct
,
pTable
->
superTable
->
numOfColumns
-
1
);
...
...
@@ -150,56 +126,54 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss
mgmtMoveVgroupToHead
(
pDb
,
pVgroup
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtChildTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
return
mgmtChildTableActionReset
(
row
,
str
,
size
,
NULL
)
;
static
int32_t
mgmtChildTableActionUpdate
(
void
*
pObj
)
{
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtChildTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
row
;
assert
(
row
!=
NULL
&&
str
!=
NULL
);
memcpy
(
str
,
pTable
,
tsChildTableUpdateSize
);
*
ssize
=
tsChildTableUpdateSize
;
static
int32_t
mgmtChildTableActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pObj
;
assert
(
pObj
!=
NULL
&&
pData
!=
NULL
);
return
NULL
;
memcpy
(
pData
,
pTable
,
tsChildTableUpdateSize
);
return
tsChildTableUpdateSize
;
}
void
*
mgmtChildTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
assert
(
str
!=
NULL
);
static
void
*
mgmtChildTableActionDecode
(
void
*
pData
)
{
assert
(
pData
!=
NULL
);
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
calloc
(
sizeof
(
SChildTableObj
),
1
);
if
(
pTable
==
NULL
)
return
NULL
;
if
(
size
<
tsChildTableUpdateSize
)
{
mgmtDestroyChildTable
(
pTable
);
return
NULL
;
}
memcpy
(
pTable
,
str
,
tsChildTableUpdateSize
);
memcpy
(
pTable
,
pData
,
tsChildTableUpdateSize
);
return
(
void
*
)
pTable
;
}
void
*
mgmtChildTableAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtChildTableActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtChildTableActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
}
return
NULL
;
}
int32_t
mgmtInitChildTables
()
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SChildTableObj
*
pTable
=
NULL
;
mgmtChildTableActionInit
();
SChildTableObj
tObj
;
tsChildTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsChildTableSdb
=
sdbOpenTable
(
tsMaxTables
,
tsChildTableUpdateSize
,
"ctables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtChildTableAction
);
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"ctables"
,
.
hashSessions
=
tsMaxTables
,
.
maxRowSize
=
tsChildTableUpdateSize
,
.
keyType
=
SDB_KEYTYPE_STRING
,
.
insertFp
=
mgmtChildTableActionInsert
,
.
deleteFp
=
mgmtChildTableActionDelete
,
.
updateFp
=
mgmtChildTableActionUpdate
,
.
encodeFp
=
mgmtChildTableActionEncode
,
.
decodeFp
=
mgmtChildTableActionDecode
,
.
destroyFp
=
mgmtChildTableActionDestroy
,
};
tsChildTableSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsChildTableSdb
==
NULL
)
{
mError
(
"failed to init child table data"
);
return
-
1
;
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
eb6764be
...
...
@@ -48,29 +48,57 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
static
void
mgmtProcessAlterDbMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessDropDbMsg
(
SQueuedMsg
*
pMsg
);
static
void
*
(
*
mgmtDbActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
int32_t
mgmtDbActionDestroy
(
void
*
pObj
)
{
tfree
(
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionInsert
(
void
*
pObj
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
pDb
->
pHead
=
NULL
;
pDb
->
pTail
=
NULL
;
pDb
->
numOfVgroups
=
0
;
pDb
->
numOfTables
=
0
;
mgmtAddDbIntoAcct
(
pAcct
,
pDb
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionDelete
(
void
*
pObj
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
mgmtRemoveDbFromAcct
(
pAcct
,
pDb
);
mgmtDropAllNormalTables
(
pDb
);
mgmtDropAllChildTables
(
pDb
);
mgmtDropAllSuperTables
(
pDb
);
static
void
mgmtDbActionInit
()
{
mgmtDbActionFp
[
SDB_TYPE_INSERT
]
=
mgmtDbActionInsert
;
mgmtDbActionFp
[
SDB_TYPE_DELETE
]
=
mgmtDbActionDelete
;
mgmtDbActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtDbActionUpdate
;
mgmtDbActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtDbActionEncode
;
mgmtDbActionFp
[
SDB_TYPE_DECODE
]
=
mgmtDbActionDecode
;
mgmtDbActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtDbActionDestroy
;
return
TSDB_CODE_SUCCESS
;
}
static
void
*
mgmtDbAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtDbActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtDbActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
static
int32_t
mgmtDbActionUpdate
(
void
*
pObj
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDbActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
pObj
;
if
(
maxRowSize
<
tsDbUpdateSize
)
{
return
-
1
;
}
else
{
memcpy
(
pData
,
pDb
,
tsDbUpdateSize
);
return
tsDbUpdateSize
;
}
return
NULL
;
}
static
void
*
mgmtDbActionDecode
(
void
*
pData
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
malloc
(
sizeof
(
SDbObj
));
if
(
pDb
==
NULL
)
return
NULL
;
memset
(
pDb
,
0
,
sizeof
(
SDbObj
));
memcpy
(
pDb
,
pData
,
tsDbUpdateSize
);
return
(
void
*
)
pDb
;
}
int32_t
mgmtInitDbs
()
{
...
...
@@ -78,12 +106,23 @@ int32_t mgmtInitDbs() {
SDbObj
*
pDb
=
NULL
;
SAcctObj
*
pAcct
=
NULL
;
mgmtDbActionInit
();
SDbObj
tObj
;
tsDbUpdateSize
=
tObj
.
updateEnd
-
(
char
*
)
&
tObj
;
tsDbSdb
=
sdbOpenTable
(
TSDB_MAX_DBS
,
tsDbUpdateSize
,
"dbs"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtDbAction
);
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"dbs"
,
.
hashSessions
=
TSDB_MAX_DBS
,
.
maxRowSize
=
tsDbUpdateSize
,
.
keyType
=
SDB_KEYTYPE_STRING
,
.
insertFp
=
mgmtDbActionInsert
,
.
deleteFp
=
mgmtDbActionDelete
,
.
updateFp
=
mgmtDbActionUpdate
,
.
encodeFp
=
mgmtDbActionEncode
,
.
decodeFp
=
mgmtDbActionDecode
,
.
destroyFp
=
mgmtDbActionDestroy
,
};
tsDbSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsDbSdb
==
NULL
)
{
mError
(
"failed to init db data"
);
return
-
1
;
...
...
@@ -683,68 +722,6 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
return
numOfRows
;
}
void
*
mgmtDbActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
row
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
pDb
->
pHead
=
NULL
;
pDb
->
pTail
=
NULL
;
pDb
->
numOfVgroups
=
0
;
pDb
->
numOfTables
=
0
;
mgmtAddDbIntoAcct
(
pAcct
,
pDb
);
return
NULL
;
}
void
*
mgmtDbActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
row
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
mgmtRemoveDbFromAcct
(
pAcct
,
pDb
);
mgmtDropAllNormalTables
(
pDb
);
mgmtDropAllChildTables
(
pDb
);
mgmtDropAllSuperTables
(
pDb
);
return
NULL
;
}
void
*
mgmtDbActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
return
mgmtDbActionReset
(
row
,
str
,
size
,
ssize
);
}
void
*
mgmtDbActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
row
;
if
(
size
<
tsDbUpdateSize
)
{
*
ssize
=
-
1
;
}
else
{
memcpy
(
str
,
pDb
,
tsDbUpdateSize
);
*
ssize
=
tsDbUpdateSize
;
}
return
NULL
;
}
void
*
mgmtDbActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
malloc
(
sizeof
(
SDbObj
));
if
(
pDb
==
NULL
)
return
NULL
;
memset
(
pDb
,
0
,
sizeof
(
SDbObj
));
memcpy
(
pDb
,
str
,
tsDbUpdateSize
);
return
(
void
*
)
pDb
;
}
void
*
mgmtDbActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
row
;
memcpy
(
pDb
,
str
,
tsDbUpdateSize
);
return
NULL
;
}
void
*
mgmtDbActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
tfree
(
row
);
return
NULL
;
}
void
mgmtAddSuperTableIntoDb
(
SDbObj
*
pDb
)
{
atomic_add_fetch_32
(
&
pDb
->
numOfSuperTables
,
1
);
}
...
...
src/mnode/src/mgmtNormalTable.c
浏览文件 @
eb6764be
...
...
@@ -35,75 +35,45 @@
void
*
tsNormalTableSdb
;
int32_t
tsNormalTableUpdateSize
;
void
*
(
*
mgmtNormalTableActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
void
*
mgmtNormalTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
mgmtDestroyNormalTable
(
SNormalTableObj
*
pTable
)
{
free
(
pTable
->
schema
);
free
(
pTable
->
sql
);
free
(
pTable
);
}
static
void
mgmtNormalTableActionInit
()
{
mgmtNormalTableActionFp
[
SDB_TYPE_INSERT
]
=
mgmtNormalTableActionInsert
;
mgmtNormalTableActionFp
[
SDB_TYPE_DELETE
]
=
mgmtNormalTableActionDelete
;
mgmtNormalTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtNormalTableActionUpdate
;
mgmtNormalTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtNormalTableActionEncode
;
mgmtNormalTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtNormalTableActionDecode
;
mgmtNormalTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtNormalTableActionDestroy
;
tfree
(
pTable
->
schema
);
tfree
(
pTable
->
sql
);
tfree
(
pTable
);
}
void
*
mgmtNormalTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
row
;
memcpy
(
pTable
,
str
,
tsNormalTableUpdateSize
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
)
+
pTable
->
sqlLen
;
pTable
->
schema
=
realloc
(
pTable
->
schema
,
schemaSize
);
pTable
->
sql
=
(
char
*
)
pTable
->
schema
+
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
);
memcpy
(
pTable
->
schema
,
str
+
tsNormalTableUpdateSize
,
schemaSize
);
return
NULL
;
}
void
*
mgmtNormalTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
row
;
static
int32_t
mgmtNormalTableActionDestroy
(
void
*
pObj
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
pObj
;
mgmtDestroyNormalTable
(
pTable
);
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtNormalTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
row
;
static
int32_t
mgmtNormalTableActionInsert
(
void
*
pObj
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
pObj
;
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"id:%s not in vgroup:%d"
,
pTable
->
tableId
,
pTable
->
vgId
);
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"vgroup:%d not in DB:%s"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
return
TSDB_CODE_INVALID_DB
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"account not exists"
);
return
NULL
;
return
TSDB_CODE_INVALID_ACCT
;
}
if
(
!
mgmtIsMaster
())
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
!=
pTable
->
sid
)
{
mError
(
"sid:%d is not matched from the master:%d"
,
sid
,
pTable
->
sid
);
return
NULL
;
return
TSDB_CODE_INVALID_SESSION_ID
;
}
}
...
...
@@ -115,30 +85,30 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s
mgmtMoveVgroupToTail
(
pDb
,
pVgroup
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtNormalTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
row
;
static
int32_t
mgmtNormalTableActionDelete
(
void
*
pObj
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
pObj
;
if
(
pTable
->
vgId
==
0
)
{
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
return
NULL
;
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"vgroup:%d not in DB:%s"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
NULL
;
return
TSDB_CODE_INVALID_DB
;
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"account not exists"
);
return
NULL
;
return
TSDB_CODE_INVALID_ACCT
;
}
mgmtRestoreTimeSeries
(
pAcct
,
pTable
->
numOfColumns
-
1
);
...
...
@@ -149,45 +119,46 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s
mgmtMoveVgroupToHead
(
pDb
,
pVgroup
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtNormalTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
return
mgmtNormalTableActionReset
(
row
,
str
,
size
,
NULL
);
static
int32_t
mgmtNormalTableActionUpdate
(
void
*
pObj
)
{
// SNormalTableObj *pTable = (SNormalTableObj *) pObj;
// memcpy(pTable, str, tsNormalTableUpdateSize);
// int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen;
// pTable->schema = realloc(pTable->schema, schemaSize);
// pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns);
// memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize);
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtNormalTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ss
ize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
row
;
assert
(
row
!=
NULL
&&
str
!=
NULL
);
static
int32_t
mgmtNormalTableActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowS
ize
)
{
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
pObj
;
assert
(
pObj
!=
NULL
&&
pData
!=
NULL
);
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
if
(
size
<
tsNormalTableUpdateSize
+
schemaSize
+
1
)
{
*
ssize
=
-
1
;
return
NULL
;
if
(
maxRowSize
<
tsNormalTableUpdateSize
+
schemaSize
+
1
)
{
return
-
1
;
}
memcpy
(
str
,
pTable
,
tsNormalTableUpdateSize
);
memcpy
(
str
+
tsNormalTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
memcpy
(
str
+
tsNormalTableUpdateSize
+
schemaSize
,
pTable
->
sql
,
pTable
->
sqlLen
);
*
ssize
=
tsNormalTableUpdateSize
+
schemaSize
+
pTable
->
sqlLen
;
return
NULL
;
memcpy
(
pData
,
pTable
,
tsNormalTableUpdateSize
);
memcpy
(
pData
+
tsNormalTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
memcpy
(
pData
+
tsNormalTableUpdateSize
+
schemaSize
,
pTable
->
sql
,
pTable
->
sqlLen
);
return
tsNormalTableUpdateSize
+
schemaSize
+
pTable
->
sqlLen
;
}
void
*
mgmtNormalTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
assert
(
str
!=
NULL
);
static
void
*
mgmtNormalTableActionDecode
(
void
*
pData
)
{
assert
(
pData
!=
NULL
);
SNormalTableObj
*
pTable
=
(
SNormalTableObj
*
)
malloc
(
sizeof
(
SNormalTableObj
));
if
(
pTable
==
NULL
)
{
return
NULL
;
}
memset
(
pTable
,
0
,
sizeof
(
SNormalTableObj
));
if
(
size
<
tsNormalTableUpdateSize
)
{
mgmtDestroyNormalTable
(
pTable
);
return
NULL
;
}
memcpy
(
pTable
,
str
,
tsNormalTableUpdateSize
);
memcpy
(
pTable
,
pData
,
tsNormalTableUpdateSize
);
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
pTable
->
schema
=
(
SSchema
*
)
malloc
(
schemaSize
);
...
...
@@ -196,35 +167,39 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s
return
NULL
;
}
memcpy
(
pTable
->
schema
,
str
+
tsNormalTableUpdateSize
,
schemaSize
);
memcpy
(
pTable
->
schema
,
pData
+
tsNormalTableUpdateSize
,
schemaSize
);
pTable
->
sql
=
(
char
*
)
malloc
(
pTable
->
sqlLen
);
if
(
pTable
->
sql
==
NULL
)
{
mgmtDestroyNormalTable
(
pTable
);
return
NULL
;
}
memcpy
(
pTable
->
sql
,
str
+
tsNormalTableUpdateSize
+
schemaSize
,
pTable
->
sqlLen
);
memcpy
(
pTable
->
sql
,
pData
+
tsNormalTableUpdateSize
+
schemaSize
,
pTable
->
sqlLen
);
return
(
void
*
)
pTable
;
}
void
*
mgmtNormalTableAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtNormalTableActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtNormalTableActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
}
return
NULL
;
}
int32_t
mgmtInitNormalTables
()
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SNormalTableObj
*
pTable
=
NULL
;
mgmtNormalTableActionInit
();
SNormalTableObj
tObj
;
tsNormalTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsNormalTableSdb
=
sdbOpenTable
(
TSDB_MAX_NORMAL_TABLES
,
sizeof
(
SNormalTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
"ntables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtNormalTableAction
);
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"ntables"
,
.
hashSessions
=
TSDB_MAX_NORMAL_TABLES
,
.
maxRowSize
=
sizeof
(
SNormalTableObj
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
.
keyType
=
SDB_KEYTYPE_STRING
,
.
insertFp
=
mgmtNormalTableActionInsert
,
.
deleteFp
=
mgmtNormalTableActionDelete
,
.
updateFp
=
mgmtNormalTableActionUpdate
,
.
encodeFp
=
mgmtNormalTableActionEncode
,
.
decodeFp
=
mgmtNormalTableActionDecode
,
.
destroyFp
=
mgmtNormalTableActionDestroy
,
};
tsNormalTableSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsNormalTableSdb
==
NULL
)
{
mError
(
"failed to init ntables data"
);
return
-
1
;
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
eb6764be
...
...
@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tutil.h"
#include "tchecksum.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "trpc.h"
#include "tutil.h"
...
...
@@ -33,32 +33,37 @@
typedef
struct
{
uint64_t
swVersion
;
int16_t
sdbFileVersion
;
char
reserved
[
6
];
char
reserved
[
2
];
TSCKSUM
checkSum
;
}
SSdbHeader
;
typedef
struct
_SSdbTable
{
SSdbHeader
header
;
int
maxRows
;
int
dbId
;
int32_t
maxRowSize
;
char
name
[
TSDB_DB_NAME_LEN
];
char
fn
[
128
];
int
keyType
;
uint32_t
autoIndex
;
int64_t
numOfRows
;
int64_t
id
;
int64_t
size
;
void
*
iHandle
;
int
fd
;
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int
,
int
*
);
SSdbHeader
header
;
char
name
[
TSDB_DB_NAME_LEN
];
char
fn
[
TSDB_FILENAME_LEN
];
ESdbKeyType
keyType
;
int32_t
dbId
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
uint32_t
autoIndex
;
int64_t
numOfRows
;
int64_t
id
;
int64_t
size
;
void
*
iHandle
;
int32_t
fd
;
int32_t
(
*
insertFp
)(
void
*
pObj
);
int32_t
(
*
deleteFp
)(
void
*
pObj
);
int32_t
(
*
updateFp
)(
void
*
pObj
);
void
*
(
*
decodeFp
)(
void
*
pData
);
// return pObj
int32_t
(
*
encodeFp
)(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
);
// return size of pData
int32_t
(
*
destroyFp
)(
void
*
pObj
);
pthread_mutex_t
mutex
;
}
SSdbTable
;
typedef
struct
{
int64_t
id
;
int64_t
offset
;
int
rowSize
;
int
32_t
rowSize
;
void
*
row
;
}
SRowMeta
;
...
...
@@ -71,9 +76,9 @@ typedef struct {
typedef
struct
{
uint8_t
dbId
;
char
type
;
int8_t
type
;
int16_t
dataLen
;
uint64_t
version
;
short
dataLen
;
char
data
[];
}
SForwardMsg
;
...
...
@@ -283,7 +288,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
// TODO: Get rid of the rowMeta.offset and rowSize
rowMeta
.
offset
=
pTable
->
size
;
rowMeta
.
rowSize
=
rowHead
->
rowSize
;
rowMeta
.
row
=
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DECODE
,
NULL
,
rowHead
->
data
,
rowHead
->
rowSize
,
NULL
);
rowMeta
.
row
=
(
*
pTable
->
decodeFp
)(
rowHead
->
data
);
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowMeta
.
row
,
&
rowMeta
);
if
(
pTable
->
keyType
==
SDB_KEYTYPE_AUTO
)
{
pTable
->
autoIndex
++
;
...
...
@@ -299,7 +304,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
if
(
rowHead
->
id
<
0
)
{
// Delete the object
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowHead
->
data
);
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DESTROY
,
pMetaRow
,
NULL
,
0
,
NULL
);
(
*
pTable
->
destroyFp
)(
pMetaRow
);
pTable
->
numOfRows
--
;
numOfDels
++
;
}
else
{
// Reset the object TODO: is it possible to merge reset and
...
...
@@ -322,7 +327,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
}
sdbVersion
+=
(
pTable
->
id
-
oldId
);
if
(
numOfDels
>
pTable
->
maxRow
s
/
4
)
sdbSaveSnapShot
(
pTable
);
if
(
numOfDels
>
pTable
->
hashSession
s
/
4
)
sdbSaveSnapShot
(
pTable
);
tfree
(
rowHead
);
return
0
;
...
...
@@ -332,20 +337,25 @@ sdb_exit1:
return
-
1
;
}
void
*
sdbOpenTable
(
int32_t
maxRows
,
int32_t
maxRowSize
,
char
*
name
,
uint8_t
keyType
,
char
*
directory
,
void
*
(
*
appTool
)(
char
,
void
*
,
char
*
,
int32_t
,
int32_t
*
))
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
malloc
(
sizeof
(
SSdbTable
));
void
*
sdbOpenTable
(
SSdbTableDesc
*
pDesc
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
calloc
(
1
,
sizeof
(
SSdbTable
));
if
(
pTable
==
NULL
)
return
NULL
;
memset
(
pTable
,
0
,
sizeof
(
SSdbTable
));
strcpy
(
pTable
->
name
,
name
);
pTable
->
keyType
=
keyType
;
pTable
->
maxRows
=
maxRows
;
pTable
->
maxRowSize
=
maxRowSize
;
pTable
->
appTool
=
appTool
;
sprintf
(
pTable
->
fn
,
"%s/%s.db"
,
directory
,
pTable
->
name
);
if
(
sdbInitIndexFp
[
keyType
]
!=
NULL
)
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
keyType
])(
maxRows
,
sizeof
(
SRowMeta
));
pTable
->
keyType
=
pDesc
->
keyType
;
pTable
->
hashSessions
=
pDesc
->
hashSessions
;
pTable
->
maxRowSize
=
pDesc
->
maxRowSize
;
pTable
->
insertFp
=
pDesc
->
insertFp
;
pTable
->
deleteFp
=
pDesc
->
deleteFp
;
pTable
->
updateFp
=
pDesc
->
updateFp
;
pTable
->
encodeFp
=
pDesc
->
encodeFp
;
pTable
->
decodeFp
=
pDesc
->
decodeFp
;
pTable
->
destroyFp
=
pDesc
->
destroyFp
;
strcpy
(
pTable
->
name
,
pDesc
->
tableName
);
sprintf
(
pTable
->
fn
,
"%s/%s.db"
,
tsMnodeDir
,
pTable
->
name
);
if
(
sdbInitIndexFp
[
pTable
->
keyType
]
!=
NULL
)
{
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
SRowMeta
));
}
pthread_mutex_init
(
&
pTable
->
mutex
,
NULL
);
...
...
@@ -386,7 +396,7 @@ void *sdbGetRow(void *handle, void *key) {
}
// row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0)
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOper
oper
)
{
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOper
Type
oper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
rowMeta
;
void
*
pObj
=
NULL
;
...
...
@@ -424,7 +434,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) {
if
(
oper
==
SDB_OPER_GLOBAL
)
{
pObj
=
row
;
}
else
{
pObj
=
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DECODE
,
NULL
,
row
,
0
,
NULL
);
pObj
=
(
*
pTable
->
decodeFp
)(
row
);
}
pthread_mutex_lock
(
&
pTable
->
mutex
);
...
...
@@ -439,7 +449,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) {
}
if
(
oper
==
SDB_OPER_GLOBAL
||
oper
==
SDB_OPER_LOCAL
)
{
(
*
(
pTable
->
appTool
))(
SDB_TYPE_ENCODE
,
pObj
,
rowHead
->
data
,
pTable
->
maxRowSize
,
&
(
rowHead
->
rowSize
)
);
rowHead
->
rowSize
=
(
*
pTable
->
encodeFp
)(
pObj
,
rowHead
->
data
,
pTable
->
maxRowSize
);
assert
(
rowHead
->
rowSize
>
0
&&
rowHead
->
rowSize
<=
pTable
->
maxRowSize
);
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
...
...
@@ -489,7 +499,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) {
pthread_mutex_unlock
(
&
pTable
->
mutex
);
(
*
pTable
->
appTool
)(
SDB_TYPE_INSERT
,
pObj
,
NULL
,
0
,
NULL
);
(
*
pTable
->
insertFp
)(
pObj
);
tfree
(
rowHead
);
...
...
@@ -497,7 +507,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) {
}
// row here can be object or null-terminated string
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
row
,
ESdbOper
oper
)
{
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
row
,
ESdbOper
Type
oper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
=
NULL
;
void
*
pMetaRow
=
NULL
;
...
...
@@ -587,13 +597,13 @@ int32_t sdbDeleteRow(void *handle, void *row, ESdbOper oper) {
tfree
(
rowHead
);
(
*
pTable
->
appTool
)(
SDB_TYPE_DELETE
,
pMetaRow
,
NULL
,
0
,
NULL
);
(
*
pTable
->
deleteFp
)(
pMetaRow
);
return
0
;
}
// row here can be the object or the string info (encoded string)
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
updateSize
,
ESdbOper
oper
)
{
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
updateSize
,
ESdbOper
Type
oper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SRowMeta
*
pMeta
=
NULL
;
int32_t
total_size
=
0
;
...
...
@@ -645,7 +655,7 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper)
memcpy
(
rowHead
->
data
,
row
,
updateSize
);
rowHead
->
rowSize
=
updateSize
;
}
else
{
(
*
(
pTable
->
appTool
))(
SDB_TYPE_ENCODE
,
pMetaRow
,
rowHead
->
data
,
pTable
->
maxRowSize
,
&
(
rowHead
->
rowSize
)
);
rowHead
->
rowSize
=
(
*
pTable
->
encodeFp
)(
pMetaRow
,
rowHead
->
data
,
pTable
->
maxRowSize
);
}
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
...
...
@@ -689,7 +699,7 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper)
pthread_mutex_unlock
(
&
pTable
->
mutex
);
(
*
(
pTable
->
appTool
))(
SDB_TYPE_UPDATE
,
pMetaRow
,
row
,
updateSize
,
NULL
);
// update in upper layer
(
*
pTable
->
updateFp
)(
pMetaRow
);
// update in upper layer
tfree
(
rowHead
);
...
...
@@ -706,7 +716,7 @@ void sdbCloseTable(void *handle) {
while
(
1
)
{
pNode
=
sdbFetchRow
(
handle
,
pNode
,
&
row
);
if
(
row
==
NULL
)
break
;
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DESTROY
,
row
,
NULL
,
0
,
NULL
);
(
*
pTable
->
destroyFp
)(
row
);
}
if
(
sdbCleanUpIndexFp
[
pTable
->
keyType
])
(
*
sdbCleanUpIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
);
...
...
@@ -724,13 +734,13 @@ void sdbCloseTable(void *handle) {
void
sdbResetTable
(
SSdbTable
*
pTable
)
{
/* SRowHead rowHead; */
SRowMeta
rowMeta
;
int32_t
bytes
;
int32_t
total_size
=
0
;
int32_t
real_size
=
0
;
int32_t
bytes
;
int32_t
total_size
=
0
;
int32_t
real_size
=
0
;
SRowHead
*
rowHead
=
NULL
;
void
*
pMetaRow
=
NULL
;
int64_t
oldId
=
pTable
->
id
;
int32_t
oldNumOfRows
=
pTable
->
numOfRows
;
int32_t
oldNumOfRows
=
pTable
->
numOfRows
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
;
pTable
->
numOfRows
=
oldNumOfRows
;
...
...
@@ -792,19 +802,19 @@ void sdbResetTable(SSdbTable *pTable) {
// TODO:Get rid of the rowMeta.offset and rowSize
rowMeta
.
offset
=
pTable
->
size
;
rowMeta
.
rowSize
=
rowHead
->
rowSize
;
rowMeta
.
row
=
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DECODE
,
NULL
,
rowHead
->
data
,
rowHead
->
rowSize
,
NULL
);
rowMeta
.
row
=
(
*
pTable
->
decodeFp
)(
rowHead
->
data
);
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowMeta
.
row
,
&
rowMeta
);
pTable
->
numOfRows
++
;
(
*
pTable
->
appTool
)(
SDB_TYPE_INSERT
,
rowMeta
.
row
,
NULL
,
0
,
NULL
);
(
*
pTable
->
insertFp
)(
rowMeta
.
row
);
}
}
else
{
// already exists
if
(
rowHead
->
id
<
0
)
{
// Delete the object
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowHead
->
data
);
(
*
(
pTable
->
appTool
))(
SDB_TYPE_DESTROY
,
pMetaRow
,
NULL
,
0
,
NULL
);
(
*
pTable
->
destroyFp
)(
pMetaRow
);
pTable
->
numOfRows
--
;
}
else
{
// update the object
(
*
(
pTable
->
appTool
))(
SDB_TYPE_UPDATE
,
pMetaRow
,
rowHead
->
data
,
rowHead
->
rowSize
,
NULL
);
(
*
pTable
->
updateFp
)(
pMetaRow
);
}
}
}
...
...
@@ -866,7 +876,7 @@ void sdbSaveSnapShot(void *handle) {
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
id
=
pMeta
->
id
;
(
*
(
pTable
->
appTool
))(
SDB_TYPE_ENCODE
,
pMeta
->
row
,
rowHead
->
data
,
pTable
->
maxRowSize
,
&
(
rowHead
->
rowSize
)
);
rowHead
->
rowSize
=
(
*
pTable
->
encodeFp
)(
pMeta
->
row
,
rowHead
->
data
,
pTable
->
maxRowSize
);
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
real_size
)
<
0
)
{
sdbError
(
"failed to get checksum while save sdb %s snapshot"
,
pTable
->
name
);
...
...
src/mnode/src/mgmtSuperTable.c
浏览文件 @
eb6764be
...
...
@@ -39,105 +39,73 @@
static
void
*
tsSuperTableSdb
;
static
int32_t
tsSuperTableUpdateSize
;
static
void
*
(
*
mgmtSuperTableActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtSuperTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
int32_t
mgmtRetrieveShowSuperTables
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetShowSuperTableMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
void
mgmtDestroySuperTable
(
SSuperTableObj
*
pTable
)
{
free
(
pTable
->
schema
);
free
(
pTable
);
t
free
(
pTable
->
schema
);
t
free
(
pTable
);
}
static
void
mgmtSuperTableActionInit
()
{
SSuperTableObj
tObj
;
tsSuperTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
mgmtSuperTableActionFp
[
SDB_TYPE_INSERT
]
=
mgmtSuperTableActionInsert
;
mgmtSuperTableActionFp
[
SDB_TYPE_DELETE
]
=
mgmtSuperTableActionDelete
;
mgmtSuperTableActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtSuperTableActionUpdate
;
mgmtSuperTableActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtSuperTableActionEncode
;
mgmtSuperTableActionFp
[
SDB_TYPE_DECODE
]
=
mgmtSuperTableActionDecode
;
mgmtSuperTableActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtSuperTableActionDestroy
;
}
void
*
mgmtSuperTableActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
row
;
memcpy
(
pTable
,
str
,
tsSuperTableUpdateSize
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
+
pTable
->
numOfTags
);
pTable
->
schema
=
realloc
(
pTable
->
schema
,
schemaSize
);
memcpy
(
pTable
->
schema
,
str
+
tsSuperTableUpdateSize
,
schemaSize
);
return
NULL
;
}
void
*
mgmtSuperTableActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
row
;
static
int32_t
mgmtSuperTableActionDestroy
(
void
*
pObj
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
pObj
;
mgmtDestroySuperTable
(
pTable
);
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtSuperTableActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
STableInfo
*
pTable
=
(
STableInfo
*
)
row
;
static
int32_t
mgmtSuperTableActionInsert
(
void
*
pObj
)
{
STableInfo
*
pTable
=
(
STableInfo
*
)
pObj
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pTable
->
tableId
);
if
(
pDb
)
{
mgmtAddSuperTableIntoDb
(
pDb
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtSuperTableActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
STableInfo
*
pTable
=
(
STableInfo
*
)
row
;
static
int32_t
mgmtSuperTableActionDelete
(
void
*
pObj
)
{
STableInfo
*
pTable
=
(
STableInfo
*
)
pObj
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pTable
->
tableId
);
if
(
pDb
)
{
mgmtRemoveSuperTableFromDb
(
pDb
);
}
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtSuperTableActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
return
mgmtSuperTableActionReset
(
row
,
str
,
size
,
NULL
);
static
int32_t
mgmtSuperTableActionUpdate
(
void
*
pObj
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
pObj
;
memcpy
(
pTable
,
pObj
,
tsSuperTableUpdateSize
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
+
pTable
->
numOfTags
);
pTable
->
schema
=
realloc
(
pTable
->
schema
,
schemaSize
);
memcpy
(
pTable
->
schema
,
pObj
+
tsSuperTableUpdateSize
,
schemaSize
);
return
TSDB_CODE_SUCCESS
;
}
void
*
mgmtSuperTableActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ss
ize
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
row
;
assert
(
row
!=
NULL
&&
str
!=
NULL
);
static
int32_t
mgmtSuperTableActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowS
ize
)
{
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
pObj
;
assert
(
pObj
!=
NULL
&&
pData
!=
NULL
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
+
pTable
->
numOfTags
);
if
(
size
<
tsSuperTableUpdateSize
+
schemaSize
+
1
)
{
*
ssize
=
-
1
;
return
NULL
;
if
(
maxRowSize
<
tsSuperTableUpdateSize
+
schemaSize
+
1
)
{
return
TSDB_CODE_INVALID_MSG_LEN
;
}
memcpy
(
str
,
pTable
,
tsSuperTableUpdateSize
);
memcpy
(
str
+
tsSuperTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
*
ssize
=
tsSuperTableUpdateSize
+
schemaSize
;
return
NULL
;
memcpy
(
pData
,
pTable
,
tsSuperTableUpdateSize
);
memcpy
(
pData
+
tsSuperTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
return
tsSuperTableUpdateSize
+
schemaSize
;
}
void
*
mgmtSuperTableActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
assert
(
str
!=
NULL
);
static
void
*
mgmtSuperTableActionDecode
(
void
*
pData
)
{
assert
(
pData
!=
NULL
);
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
malloc
(
sizeof
(
SSuperTableObj
));
if
(
pTable
==
NULL
)
{
return
NULL
;
}
memset
(
pTable
,
0
,
sizeof
(
SSuperTableObj
));
if
(
size
<
tsSuperTableUpdateSize
)
{
mgmtDestroySuperTable
(
pTable
);
return
NULL
;
}
memcpy
(
pTable
,
str
,
tsSuperTableUpdateSize
);
memcpy
(
pTable
,
pData
,
tsSuperTableUpdateSize
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pTable
->
numOfColumns
+
pTable
->
numOfTags
);
pTable
->
schema
=
malloc
(
schemaSize
);
...
...
@@ -146,26 +114,32 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss
return
NULL
;
}
memcpy
(
pTable
->
schema
,
str
+
tsSuperTableUpdateSize
,
schemaSize
);
memcpy
(
pTable
->
schema
,
pData
+
tsSuperTableUpdateSize
,
schemaSize
);
return
(
void
*
)
pTable
;
}
void
*
mgmtSuperTableAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtSuperTableActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtSuperTableActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
}
return
NULL
;
}
int32_t
mgmtInitSuperTables
()
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SSuperTableObj
*
pTable
=
NULL
;
mgmtSuperTableActionInit
();
SSuperTableObj
tObj
;
tsSuperTableUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsSuperTableSdb
=
sdbOpenTable
(
TSDB_MAX_SUPER_TABLES
,
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
"stables"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtSuperTableAction
);
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"stables"
,
.
hashSessions
=
TSDB_MAX_SUPER_TABLES
,
.
maxRowSize
=
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
,
.
keyType
=
SDB_KEYTYPE_STRING
,
.
insertFp
=
mgmtSuperTableActionInsert
,
.
deleteFp
=
mgmtSuperTableActionDelete
,
.
updateFp
=
mgmtSuperTableActionUpdate
,
.
encodeFp
=
mgmtSuperTableActionEncode
,
.
decodeFp
=
mgmtSuperTableActionDecode
,
.
destroyFp
=
mgmtSuperTableActionDestroy
,
};
tsSuperTableSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsSuperTableSdb
==
NULL
)
{
mError
(
"failed to init stables data"
);
return
-
1
;
...
...
@@ -365,8 +339,8 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
return
TSDB_CODE_INVALID_MSG_TYPE
;
}
int32_t
rowSize
=
0
;
uint32_t
len
=
strlen
(
newTagName
);
int32_t
rowSize
=
0
;
uint32_t
len
=
strlen
(
newTagName
);
if
(
col
>=
pStable
->
numOfTags
||
len
>=
TSDB_COL_NAME_LEN
||
mgmtFindSuperTableTagIndex
(
pStable
,
newTagName
)
>=
0
)
{
return
TSDB_CODE_APP_ERROR
;
...
...
@@ -382,7 +356,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
if
(
msg
==
NULL
)
return
TSDB_CODE_APP_ERROR
;
memset
(
msg
,
0
,
size
);
mgmtSuperTableActionEncode
(
pStable
,
msg
,
size
,
&
rowSize
);
//
mgmtSuperTableActionEncode(pStable, msg, size, &rowSize);
int32_t
ret
=
sdbUpdateRow
(
tsSuperTableSdb
,
msg
,
tsSuperTableUpdateSize
,
SDB_OPER_GLOBAL
);
tfree
(
msg
);
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
eb6764be
...
...
@@ -18,6 +18,7 @@
#include "trpc.h"
#include "tschemautil.h"
#include "ttime.h"
#include "tutil.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
...
...
@@ -38,16 +39,53 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static
void
mgmtProcessAlterUserMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessDropUserMsg
(
SQueuedMsg
*
pMsg
);
static
void
*
(
*
mgmtUserActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
mgmtUserActionInit
();
static
int32_t
mgmtUserActionDestroy
(
void
*
pObj
)
{
tfree
(
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionInsert
(
void
*
pObj
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
pUser
->
pAcct
=
pAcct
;
mgmtAddUserIntoAcct
(
pAcct
,
pUser
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionDelete
(
void
*
pObj
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
pObj
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
mgmtRemoveUserFromAcct
(
pAcct
,
pUser
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionUpdate
(
void
*
pObj
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtUserActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
pObj
;
if
(
maxRowSize
<
tsUserUpdateSize
)
{
return
-
1
;
}
else
{
memcpy
(
pData
,
pUser
,
tsUserUpdateSize
);
return
tsUserUpdateSize
;
}
}
static
void
*
mgmtUserActionDecode
(
void
*
pData
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
malloc
(
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
return
NULL
;
memset
(
pUser
,
0
,
sizeof
(
SUserObj
));
memcpy
(
pUser
,
pData
,
tsUserUpdateSize
);
return
pUser
;
}
int32_t
mgmtInitUsers
()
{
void
*
pNode
=
NULL
;
...
...
@@ -55,12 +93,23 @@ int32_t mgmtInitUsers() {
SAcctObj
*
pAcct
=
NULL
;
int32_t
numOfUsers
=
0
;
mgmtUserActionInit
();
SUserObj
tObj
;
tsUserUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsUserSdb
=
sdbOpenTable
(
TSDB_MAX_USERS
,
tsUserUpdateSize
,
"users"
,
SDB_KEYTYPE_STRING
,
tsMnodeDir
,
mgmtUserAction
);
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"users"
,
.
hashSessions
=
TSDB_MAX_USERS
,
.
maxRowSize
=
tsUserUpdateSize
,
.
keyType
=
SDB_KEYTYPE_STRING
,
.
insertFp
=
mgmtUserActionInsert
,
.
deleteFp
=
mgmtUserActionDelete
,
.
updateFp
=
mgmtUserActionUpdate
,
.
encodeFp
=
mgmtUserActionEncode
,
.
decodeFp
=
mgmtUserActionDecode
,
.
destroyFp
=
mgmtUserActionDestroy
,
};
tsUserSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsUserSdb
==
NULL
)
{
mError
(
"failed to init user data"
);
return
-
1
;
...
...
@@ -246,82 +295,6 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
return
numOfRows
;
}
static
void
mgmtUserActionInit
()
{
mgmtUserActionFp
[
SDB_TYPE_INSERT
]
=
mgmtUserActionInsert
;
mgmtUserActionFp
[
SDB_TYPE_DELETE
]
=
mgmtUserActionDelete
;
mgmtUserActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtUserActionUpdate
;
mgmtUserActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtUserActionEncode
;
mgmtUserActionFp
[
SDB_TYPE_DECODE
]
=
mgmtUserActionDecode
;
mgmtUserActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtUserActionDestroy
;
}
static
void
*
mgmtUserAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtUserActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtUserActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
}
return
NULL
;
}
static
void
*
mgmtUserActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
row
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
pUser
->
pAcct
=
pAcct
;
mgmtAddUserIntoAcct
(
pAcct
,
pUser
);
return
NULL
;
}
static
void
*
mgmtUserActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
row
;
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pUser
->
acct
);
mgmtRemoveUserFromAcct
(
pAcct
,
pUser
);
return
NULL
;
}
static
void
*
mgmtUserActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
return
mgmtUserActionReset
(
row
,
str
,
size
,
ssize
);
}
static
void
*
mgmtUserActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
row
;
if
(
size
<
tsUserUpdateSize
)
{
*
ssize
=
-
1
;
}
else
{
memcpy
(
str
,
pUser
,
tsUserUpdateSize
);
*
ssize
=
tsUserUpdateSize
;
}
return
NULL
;
}
static
void
*
mgmtUserActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
malloc
(
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
return
NULL
;
memset
(
pUser
,
0
,
sizeof
(
SUserObj
));
memcpy
(
pUser
,
str
,
tsUserUpdateSize
);
return
(
void
*
)
pUser
;
}
static
void
*
mgmtUserActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
row
;
memcpy
(
pUser
,
str
,
tsUserUpdateSize
);
return
NULL
;
}
static
void
*
mgmtUserActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
tfree
(
row
);
return
NULL
;
}
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
)
{
SRpcConnInfo
connInfo
;
if
(
rpcGetConnInfo
(
pConn
,
&
connInfo
)
==
0
)
{
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
eb6764be
...
...
@@ -33,15 +33,6 @@
static
void
*
tsVgroupSdb
=
NULL
;
static
int32_t
tsVgUpdateSize
=
0
;
static
void
*
(
*
mgmtVgroupActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtVgroupActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
int32_t
mgmtGetVgroupMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVgroups
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCreateVnodeRsp
(
SRpcMsg
*
rpcMsg
);
...
...
@@ -50,32 +41,98 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static
void
mgmtSendDropVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
static
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
static
void
mgmtVgroupActionInit
()
{
SVgObj
tObj
;
tsVgUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
static
int32_t
mgmtVgroupActionDestroy
(
void
*
pObj
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
pObj
;
if
(
pVgroup
->
idPool
)
{
taosIdPoolCleanUp
(
pVgroup
->
idPool
);
pVgroup
->
idPool
=
NULL
;
}
if
(
pVgroup
->
tableList
)
tfree
(
pVgroup
->
tableList
);
tfree
(
pObj
);
return
TSDB_CODE_SUCCESS
;
}
mgmtVgroupActionFp
[
SDB_TYPE_INSERT
]
=
mgmtVgroupActionInsert
;
mgmtVgroupActionFp
[
SDB_TYPE_DELETE
]
=
mgmtVgroupActionDelete
;
mgmtVgroupActionFp
[
SDB_TYPE_UPDATE
]
=
mgmtVgroupActionUpdate
;
mgmtVgroupActionFp
[
SDB_TYPE_ENCODE
]
=
mgmtVgroupActionEncode
;
mgmtVgroupActionFp
[
SDB_TYPE_DECODE
]
=
mgmtVgroupActionDecode
;
mgmtVgroupActionFp
[
SDB_TYPE_DESTROY
]
=
mgmtVgroupActionDestroy
;
static
int32_t
mgmtVgroupActionInsert
(
void
*
pObj
)
{
SVgObj
*
pVgroup
=
pObj
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
pVgroup
->
vnodeGid
[
i
].
vnode
=
pVgroup
->
vgId
;
}
return
TSDB_CODE_SUCCESS
;
}
static
void
*
mgmtVgroupAction
(
char
action
,
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
if
(
mgmtVgroupActionFp
[(
uint8_t
)
action
]
!=
NULL
)
{
return
(
*
(
mgmtVgroupActionFp
[(
uint8_t
)
action
]))(
row
,
str
,
size
,
ssize
);
static
int32_t
mgmtVgroupActionDelete
(
void
*
pObj
)
{
SVgObj
*
pVgroup
=
pObj
;
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
!=
NULL
)
{
mgmtRemoveVgroupFromDb
(
pDb
,
pVgroup
);
}
return
NULL
;
// mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
tfree
(
pVgroup
->
tableList
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionUpdate
(
void
*
pObj
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
pObj
;
int32_t
oldTables
=
taosIdPoolMaxSize
(
pVgroup
->
idPool
);
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
!=
NULL
)
{
if
(
pDb
->
cfg
.
maxSessions
!=
oldTables
)
{
mPrint
(
"vgroup:%d tables change from %d to %d"
,
pVgroup
->
vgId
,
oldTables
,
pDb
->
cfg
.
maxSessions
);
taosUpdateIdPool
(
pVgroup
->
idPool
,
pDb
->
cfg
.
maxSessions
);
int32_t
size
=
sizeof
(
STableInfo
*
)
*
pDb
->
cfg
.
maxSessions
;
pVgroup
->
tableList
=
(
STableInfo
**
)
realloc
(
pVgroup
->
tableList
,
size
);
}
}
mTrace
(
"vgroup:%d update, numOfVnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtVgroupActionEncode
(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
pObj
;
if
(
maxRowSize
<
tsVgUpdateSize
)
{
return
-
1
;
}
else
{
memcpy
(
pData
,
pVgroup
,
tsVgUpdateSize
);
return
tsVgUpdateSize
;
}
}
static
void
*
mgmtVgroupActionDecode
(
void
*
pObj
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
malloc
(
sizeof
(
SVgObj
));
if
(
pVgroup
==
NULL
)
return
NULL
;
memset
(
pVgroup
,
0
,
sizeof
(
SVgObj
));
memcpy
(
pVgroup
,
pObj
,
tsVgUpdateSize
);
return
pVgroup
;
}
int32_t
mgmtInitVgroups
()
{
void
*
pNode
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
mgmtVgroupActionInit
();
SVgObj
tObj
;
tsVgUpdateSize
=
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableDesc
=
{
.
tableName
=
"vgroups"
,
.
hashSessions
=
TSDB_MAX_VGROUPS
,
.
maxRowSize
=
tsVgUpdateSize
,
.
keyType
=
SDB_KEYTYPE_AUTO
,
.
insertFp
=
mgmtVgroupActionInsert
,
.
deleteFp
=
mgmtVgroupActionDelete
,
.
updateFp
=
mgmtVgroupActionUpdate
,
.
encodeFp
=
mgmtVgroupActionEncode
,
.
decodeFp
=
mgmtVgroupActionDecode
,
.
destroyFp
=
mgmtVgroupActionDestroy
,
};
tsVgroupSdb
=
sdbOpenTable
(
TSDB_MAX_VGROUPS
,
tsVgUpdateSize
,
"vgroups"
,
SDB_KEYTYPE_AUTO
,
tsMnodeDir
,
mgmtVgroupAction
);
tsVgroupSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsVgroupSdb
==
NULL
)
{
mError
(
"failed to init vgroups data"
);
return
-
1
;
...
...
@@ -389,90 +446,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
return
numOfRows
;
}
static
void
*
mgmtVgroupActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
row
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
pVgroup
->
vnodeGid
[
i
].
vnode
=
pVgroup
->
vgId
;
}
return
NULL
;
}
static
void
*
mgmtVgroupActionDelete
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
row
;
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
!=
NULL
)
{
mgmtRemoveVgroupFromDb
(
pDb
,
pVgroup
);
}
// mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
tfree
(
pVgroup
->
tableList
);
return
NULL
;
}
static
void
*
mgmtVgroupActionUpdate
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
mgmtVgroupActionReset
(
row
,
str
,
size
,
ssize
);
SVgObj
*
pVgroup
=
(
SVgObj
*
)
row
;
int32_t
oldTables
=
taosIdPoolMaxSize
(
pVgroup
->
idPool
);
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
!=
NULL
)
{
if
(
pDb
->
cfg
.
maxSessions
!=
oldTables
)
{
mPrint
(
"vgroup:%d tables change from %d to %d"
,
pVgroup
->
vgId
,
oldTables
,
pDb
->
cfg
.
maxSessions
);
taosUpdateIdPool
(
pVgroup
->
idPool
,
pDb
->
cfg
.
maxSessions
);
int32_t
size
=
sizeof
(
STableInfo
*
)
*
pDb
->
cfg
.
maxSessions
;
pVgroup
->
tableList
=
(
STableInfo
**
)
realloc
(
pVgroup
->
tableList
,
size
);
}
}
mTrace
(
"vgroup:%d update, numOfVnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
return
NULL
;
}
static
void
*
mgmtVgroupActionEncode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
row
;
if
(
size
<
tsVgUpdateSize
)
{
*
ssize
=
-
1
;
}
else
{
memcpy
(
str
,
pVgroup
,
tsVgUpdateSize
);
*
ssize
=
tsVgUpdateSize
;
}
return
NULL
;
}
static
void
*
mgmtVgroupActionDecode
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
malloc
(
sizeof
(
SVgObj
));
if
(
pVgroup
==
NULL
)
return
NULL
;
memset
(
pVgroup
,
0
,
sizeof
(
SVgObj
));
int32_t
tsVgUpdateSize
=
pVgroup
->
updateEnd
-
(
int8_t
*
)
pVgroup
;
memcpy
(
pVgroup
,
str
,
tsVgUpdateSize
);
return
(
void
*
)
pVgroup
;
}
static
void
*
mgmtVgroupActionReset
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
row
;
memcpy
(
pVgroup
,
str
,
tsVgUpdateSize
);
return
NULL
;
}
static
void
*
mgmtVgroupActionDestroy
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
row
;
if
(
pVgroup
->
idPool
)
{
taosIdPoolCleanUp
(
pVgroup
->
idPool
);
pVgroup
->
idPool
=
NULL
;
}
if
(
pVgroup
->
tableList
)
tfree
(
pVgroup
->
tableList
);
tfree
(
row
);
return
NULL
;
}
void
mgmtUpdateVgroup
(
SVgObj
*
pVgroup
)
{
sdbUpdateRow
(
tsVgroupSdb
,
pVgroup
,
tsVgUpdateSize
,
SDB_OPER_LOCAL
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录