Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
79519165
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
79519165
编写于
11月 09, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add sdb interface
上级
6d6b55ea
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
570 addition
and
447 deletion
+570
-447
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+106
-38
include/util/taoserror.h
include/util/taoserror.h
+11
-8
source/dnode/mnode/impl/inc/mnodeDef.h
source/dnode/mnode/impl/inc/mnodeDef.h
+3
-11
source/dnode/mnode/impl/src/mnodeAcct.c
source/dnode/mnode/impl/src/mnodeAcct.c
+87
-103
source/dnode/mnode/impl/src/mnodeUser.c
source/dnode/mnode/impl/src/mnodeUser.c
+109
-78
source/dnode/mnode/impl/src/mnodeWorker.c
source/dnode/mnode/impl/src/mnodeWorker.c
+5
-5
source/dnode/mnode/impl/src/mondeInt.c
source/dnode/mnode/impl/src/mondeInt.c
+2
-2
source/dnode/mnode/sdb/inc/sdbInt.h
source/dnode/mnode/sdb/inc/sdbInt.h
+30
-4
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+205
-189
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+11
-8
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
79519165
...
...
@@ -16,62 +16,130 @@
#ifndef _TD_SDB_H_
#define _TD_SDB_H_
#include "cJSON.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \
{ \
if ((dataLen) >= (valLen)) { \
memcpy((val), (char *)(pData), (valLen)); \
(dataLen) -= (valLen); \
(pData) = (char *)(pData) + (valLen); \
} else { \
code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \
} \
}
#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \
{ \
if (dataLen >= sizeof(int32_t)) { \
*(int32_t *)(pData) = (int32_t)(val); \
(dataLen) -= sizeof(int32_t); \
(pData) = (char *)(pData) + sizeof(int32_t); \
} else { \
code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \
} \
}
#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \
{ \
if (dataLen >= sizeof(int64_t)) { \
*(int64_t *)(pData) = (int64_t)(val); \
(dataLen) -= sizeof(int64_t); \
(pData) = (char *)(pData) + sizeof(int64_t); \
} else { \
code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \
} \
}
#define SDB_SET_INT64_VAL(pData, dataLen, val) \
{ \
*(int64_t *)(pData) = (int64_t)(val); \
(dataLen) += sizeof(int64_t); \
(pData) += sizeof(int64_t); \
}
#define SDB_SET_INT32_VAL(pData, dataLen, val) \
{ \
*(int32_t *)(pData) = (int32_t)(val); \
(dataLen) += sizeof(int32_t); \
(pData) += sizeof(int32_t); \
}
#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \
{ \
memcpy((char *)(pData), (val), (valLen)); \
(dataLen) += (valLen); \
(pData) += (valLen); \
}
typedef
enum
{
MN_SDB_START
=
0
,
MN_SDB_CLUSTER
=
1
,
MN_SDB_DNODE
=
2
,
MN_SDB_MNODE
=
3
,
MN_SDB_ACCT
=
4
,
MN_SDB_AUTH
=
5
,
MN_SDB_USER
=
6
,
MN_SDB_DB
=
7
,
MN_SDB_VGROUP
=
8
,
MN_SDB_STABLE
=
9
,
MN_SDB_FUNC
=
10
,
MN_SDB_OPER
=
11
,
MN_SDB_MAX
=
12
}
EMnSdb
;
typedef
enum
{
MN_OP_START
=
0
,
MN_OP_INSERT
=
1
,
MN_OP_UPDATE
=
2
,
MN_OP_DELETE
=
3
,
MN_OP_MAX
=
4
}
EMnOp
;
typedef
enum
{
MN_KEY_START
=
0
,
MN_KEY_BINARY
=
1
,
MN_KEY_INT32
=
2
,
MN_KEY_INT64
=
3
,
MN_KEY_MAX
}
EMnKey
;
typedef
enum
{
MN_SDB_STAT_AVAIL
=
0
,
MN_SDB_STAT_DROPPED
=
1
}
EMnSdbStat
;
SDB_START
=
0
,
SDB_VERSION
=
1
,
SDB_CLUSTER
=
2
,
SDB_DNODE
=
3
,
SDB_MNODE
=
4
,
SDB_ACCT
=
5
,
SDB_AUTH
=
6
,
SDB_USER
=
7
,
SDB_DB
=
8
,
SDB_VGROUP
=
9
,
SDB_STABLE
=
10
,
SDB_FUNC
=
11
,
SDB_OPER
=
12
,
SDB_MAX
=
13
}
ESdbType
;
typedef
enum
{
SDB_ACTION_INSERT
=
1
,
SDB_ACTION_UPDATE
=
2
,
SDB_ACTION_DELETE
=
3
}
ESdbAction
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_READY
,
SDB_STATUS_DROPPING
,
SDB_STATUS_DROPPED
}
ESdbStatus
;
typedef
struct
{
int8_t
type
;
int8_t
sver
;
int8_t
status
;
int8_t
align
[
6
];
}
SdbHead
;
int8_t
action
;
int8_t
reserved
[
4
];
int32_t
cksum
;
int32_t
dataLen
;
char
data
[];
}
SSdbRawData
;
typedef
int32_t
(
*
SdbInsertFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbDeployFp
)();
typedef
void
*
(
*
SdbDecodeFp
)(
SSdbRawData
*
pRaw
);
typedef
SSdbRawData
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
void
(
*
SdbDeployFp
)();
typedef
void
*
(
*
SdbDecodeFp
)(
cJSON
*
root
);
typedef
int32_t
(
*
SdbEncodeFp
)(
void
*
pHead
,
char
*
buf
,
int32_t
maxLen
);
typedef
struct
{
ESdbType
sdbType
;
EKeyType
keyType
;
SdbDeployFp
deployFp
;
SdbEncodeFp
encodeFp
;
SdbDecodeFp
decodeFp
;
SdbInsertFp
insertFp
;
SdbUpdateFp
updateFp
;
SdbDeleteFp
deleteFp
;
}
SSdbDesc
;
int32_t
sdbInit
();
void
sdbCleanup
();
void
sdbSetHandler
(
SSdbDesc
desc
);
int32_t
sdbRead
();
int32_t
sdbWrite
(
SSdbRawData
*
pRawData
);
int32_t
sdbCommit
();
int32_t
sdbDeploy
();
void
sdbUnDeploy
();
void
*
sdbInsertRow
(
EMnSdb
sdb
,
void
*
pObj
);
void
sdbDeleteRow
(
EMnSdb
sdb
,
void
*
pHead
);
void
*
sdbUpdateRow
(
EMnSdb
sdb
,
void
*
pHead
);
void
*
sdbGetRow
(
EMnSdb
sdb
,
void
*
pKey
);
void
*
sdbFetchRow
(
EMnSdb
sdb
,
void
*
pIter
);
void
sdbCancelFetch
(
EMnSdb
sdb
,
void
*
pIter
);
int32_t
sdbGetCount
(
EMnSdb
sdb
);
void
sdbSetFp
(
EMnSdb
,
EMnKey
,
SdbDeployFp
,
SdbEncodeFp
,
SdbDecodeFp
,
int32_t
dataSize
);
void
*
sdbAcquire
(
ESdbType
sdb
,
void
*
pKey
);
void
sdbRelease
(
ESdbType
sdb
,
void
*
pObj
);
void
*
sdbFetch
(
ESdbType
sdb
,
void
*
pIter
);
void
sdbCancelFetch
(
ESdbType
sdb
,
void
*
pIter
);
int32_t
sdbGetSize
(
ESdbType
sdb
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
79519165
...
...
@@ -131,12 +131,15 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
#define TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) //"Object already there")
#define TSDB_CODE_MND_SDB_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321) //"Unexpected generic error in sdb")
#define TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0322) //"Invalid table type")
#define TSDB_CODE_MND_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323) //"Object not there")
#define TSDB_CODE_MND_SDB_INVAID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0324) //"Invalid meta row")
#define TSDB_CODE_MND_SDB_INVAID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) //"Invalid key type")
#define TSDB_CODE_SDB_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
#define TSDB_CODE_SDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0321)
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0322)
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323)
#define TSDB_CODE_SDB_INVAID_RAW_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0324)
#define TSDB_CODE_SDB_INVAID_RAW_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0325)
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326)
#define TSDB_CODE_SDB_INVAID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0327)
#define TSDB_CODE_SDB_INVAID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0328)
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists")
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist")
...
...
@@ -155,12 +158,12 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED TAOS_DEF_ERROR_CODE(0, 0x033E) //"Dnode Ep not configured")
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) //"Account already exists")
#define TSDB_CODE_MND_
INVALID_ACCT
TAOS_DEF_ERROR_CODE(0, 0x0341) //"Invalid account")
#define TSDB_CODE_MND_
ACCT_NOT_EXIST
TAOS_DEF_ERROR_CODE(0, 0x0341) //"Invalid account")
#define TSDB_CODE_MND_INVALID_ACCT_OPTION TAOS_DEF_ERROR_CODE(0, 0x0342) //"Invalid account options")
#define TSDB_CODE_MND_ACCT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0343) //"Account authorization has expired")
#define TSDB_CODE_MND_USER_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0350) //"User already exists")
#define TSDB_CODE_MND_
INVALID_USER
TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid user")
#define TSDB_CODE_MND_
USER_NOT_EXIST
TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid user")
#define TSDB_CODE_MND_INVALID_USER_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0352) //"Invalid user format")
#define TSDB_CODE_MND_INVALID_PASS_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0353) //"Invalid password format")
#define TSDB_CODE_MND_NO_USER_FROM_CONN TAOS_DEF_ERROR_CODE(0, 0x0354) //"Can not get user from conn")
...
...
source/dnode/mnode/impl/inc/mnodeDef.h
浏览文件 @
79519165
...
...
@@ -81,7 +81,6 @@ typedef enum {
typedef
struct
SClusterObj
{
SdbHead
head
;
int64_t
id
;
char
uid
[
TSDB_CLUSTER_ID_LEN
];
int64_t
createdTime
;
...
...
@@ -89,7 +88,6 @@ typedef struct SClusterObj {
}
SClusterObj
;
typedef
struct
SDnodeObj
{
SdbHead
head
;
int32_t
id
;
int32_t
vnodes
;
int64_t
createdTime
;
...
...
@@ -106,7 +104,6 @@ typedef struct SDnodeObj {
}
SDnodeObj
;
typedef
struct
SMnodeObj
{
SdbHead
head
;
int32_t
id
;
int8_t
status
;
int8_t
role
;
...
...
@@ -123,7 +120,7 @@ typedef struct {
int32_t
maxTimeSeries
;
int32_t
maxStreams
;
int64_t
maxStorage
;
// In unit of GB
int
8_t
accessState
;
// Configured only by command
int
32_t
accessState
;
// Configured only by command
}
SAcctCfg
;
typedef
struct
{
...
...
@@ -136,18 +133,16 @@ typedef struct {
}
SAcctInfo
;
typedef
struct
SAcctObj
{
SdbHead
head
;
char
acct
[
TSDB_USER_LEN
];
int64_t
createdTime
;
int64_t
updateTime
;
int32_t
acctId
;
int
8_t
status
;
int
32_t
status
;
SAcctCfg
cfg
;
SAcctInfo
info
;
}
SAcctObj
;
typedef
struct
SUserObj
{
SdbHead
head
;
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_KEY_LEN
];
char
acct
[
TSDB_USER_LEN
];
...
...
@@ -182,7 +177,6 @@ typedef struct {
}
SDbCfg
;
typedef
struct
SDbObj
{
SdbHead
head
;
char
name
[
TSDB_FULL_DB_NAME_LEN
];
char
acct
[
TSDB_USER_LEN
];
int64_t
createdTime
;
...
...
@@ -226,7 +220,6 @@ typedef struct SVgObj {
}
SVgObj
;
typedef
struct
SSTableObj
{
SdbHead
head
;
char
tableId
[
TSDB_TABLE_NAME_LEN
];
uint64_t
uid
;
int64_t
createdTime
;
...
...
@@ -237,7 +230,6 @@ typedef struct SSTableObj {
}
SSTableObj
;
typedef
struct
SFuncObj
{
SdbHead
head
;
char
name
[
TSDB_FUNC_NAME_LEN
];
char
path
[
128
];
int32_t
contLen
;
...
...
source/dnode/mnode/impl/src/mnodeAcct.c
浏览文件 @
79519165
...
...
@@ -14,133 +14,117 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
static
void
mnodeCreateDefaultAcct
()
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
#define ACCT_VER 1
SAcctObj
acctObj
=
{
0
};
tstrncpy
(
acctObj
.
acct
,
TSDB_DEFAULT_USER
,
TSDB_USER_LEN
);
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
128
,
.
maxDbs
=
128
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxStreams
=
1000
,
.
maxStorage
=
INT64_MAX
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
acctObj
.
acctId
=
1
;
acctObj
.
createdTime
=
taosGetTimestampMs
();
acctObj
.
updateTime
=
taosGetTimestampMs
();
static
SSdbRawData
*
mnodeAcctActionEncode
(
SAcctObj
*
pAcct
)
{
SSdbRawData
*
pRaw
=
calloc
(
1
,
sizeof
(
SAcctObj
)
+
sizeof
(
SSdbRawData
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
sdbInsertRow
(
MN_SDB_ACCT
,
&
acctObj
);
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pAcct
->
acct
,
TSDB_USER_LEN
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
createdTime
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
updateTime
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
acctId
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
status
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxUsers
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxDbs
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxTimeSeries
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStreams
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStorage
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
accessState
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_ACCT
;
pRaw
->
sver
=
ACCT_VER
;
return
pRaw
;
}
int32_t
mnodeEncodeAcct
(
SAcctObj
*
pAcct
,
char
*
buf
,
int32_t
maxLen
)
{
int32_t
len
=
0
;
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"{
\"
type
\"
:%d, "
,
MN_SDB_ACCT
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
acct
\"
:
\"
%s
\"
, "
,
pAcct
->
acct
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
acctId
\"
:
\"
%d
\"
, "
,
pAcct
->
acctId
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxUsers
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxUsers
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxDbs
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxDbs
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxTimeSeries
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxTimeSeries
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxStreams
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
maxStreams
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
maxStorage
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
cfg
.
maxStorage
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
accessState
\"
:
\"
%d
\"
, "
,
pAcct
->
cfg
.
accessState
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
createdTime
\"
:
\"
%"
PRIu64
"
\"
, "
,
pAcct
->
createdTime
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
updateTime
\"
:
\"
%"
PRIu64
"
\"
}
\n
"
,
pAcct
->
updateTime
);
return
len
;
}
static
SAcctObj
*
mnodeAcctActionDecode
(
SSdbRawData
*
pRaw
)
{
if
(
pRaw
->
sver
!=
ACCT_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
return
NULL
;
}
SAcctObj
*
mnodeDecodeAcct
(
cJSON
*
root
)
{
int32_t
code
=
-
1
;
SAcctObj
*
pAcct
=
calloc
(
1
,
sizeof
(
SAcctObj
));
cJSON
*
acct
=
cJSON_GetObjectItem
(
root
,
"acct"
);
if
(
!
acct
||
acct
->
type
!=
cJSON_String
)
{
mError
(
"failed to parse acct since acct not found"
);
goto
DECODE_ACCT_OVER
;
if
(
pAcct
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
tstrncpy
(
pAcct
->
acct
,
acct
->
valuestring
,
TSDB_USER_LEN
);
cJSON
*
acctId
=
cJSON_GetObjectItem
(
root
,
"acctId"
);
if
(
!
acctId
||
acctId
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since acctId not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
acctId
=
atol
(
acctId
->
valuestring
);
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pAcct
->
acct
,
TSDB_USER_LEN
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
createdTime
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
updateTime
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
acctId
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
status
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxUsers
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxDbs
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxTimeSeries
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStreams
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStorage
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
accessState
,
code
)
cJSON
*
maxUsers
=
cJSON_GetObjectItem
(
root
,
"maxUsers"
);
if
(
!
maxUsers
||
maxUsers
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since maxUsers not found"
,
pAcct
->
acct
)
;
goto
DECODE_ACCT_OVER
;
if
(
code
!=
0
)
{
tfree
(
pAcct
);
terrno
=
code
;
return
NULL
;
}
pAcct
->
cfg
.
maxUsers
=
atol
(
maxUsers
->
valuestring
);
cJSON
*
maxDbs
=
cJSON_GetObjectItem
(
root
,
"maxDbs"
);
if
(
!
maxDbs
||
maxDbs
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since maxDbs not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
cfg
.
maxDbs
=
atol
(
maxDbs
->
valuestring
);
return
pAcct
;
}
cJSON
*
maxTimeSeries
=
cJSON_GetObjectItem
(
root
,
"maxTimeSeries"
);
if
(
!
maxTimeSeries
||
maxTimeSeries
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since maxTimeSeries not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
cfg
.
maxTimeSeries
=
atol
(
maxTimeSeries
->
valuestring
);
static
int32_t
mnodeAcctActionInsert
(
SAcctObj
*
pAcct
)
{
return
0
;
}
cJSON
*
maxStreams
=
cJSON_GetObjectItem
(
root
,
"maxStreams"
);
if
(
!
maxStreams
||
maxStreams
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since maxStreams not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
cfg
.
maxStreams
=
atol
(
maxStreams
->
valuestring
);
static
int32_t
mnodeAcctActionDelete
(
SAcctObj
*
pAcct
)
{
return
0
;
}
cJSON
*
maxStorage
=
cJSON_GetObjectItem
(
root
,
"maxStorage"
);
if
(
!
maxStorage
||
maxStorage
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since maxStorage not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
cfg
.
maxStorage
=
atoll
(
maxStorage
->
valuestring
);
static
int32_t
mnodeAcctActionUpdate
(
SAcctObj
*
pSrcAcct
,
SAcctObj
*
pDstAcct
)
{
memcpy
(
pDstAcct
,
pSrcAcct
,
(
int32_t
)((
char
*
)
&
pDstAcct
->
info
-
(
char
*
)
&
pDstAcct
));
return
0
;
}
cJSON
*
accessState
=
cJSON_GetObjectItem
(
root
,
"accessState"
);
if
(
!
accessState
||
accessState
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since accessState not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
cfg
.
accessState
=
atol
(
accessState
->
valuestring
);
static
int32_t
mnodeCreateDefaultAcct
()
{
int32_t
code
=
0
;
cJSON
*
createdTime
=
cJSON_GetObjectItem
(
root
,
"createdTime"
);
if
(
!
createdTime
||
createdTime
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since createdTime not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
}
pAcct
->
createdTime
=
atol
(
createdTime
->
valuestring
);
SAcctObj
acctObj
=
{
0
};
tstrncpy
(
acctObj
.
acct
,
TSDB_DEFAULT_USER
,
TSDB_USER_LEN
);
acctObj
.
createdTime
=
taosGetTimestampMs
();
acctObj
.
updateTime
=
taosGetTimestampMs
();
acctObj
.
acctId
=
1
;
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
128
,
.
maxDbs
=
128
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxStreams
=
1000
,
.
maxStorage
=
INT64_MAX
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
cJSON
*
updateTime
=
cJSON_GetObjectItem
(
root
,
"updateTime"
);
if
(
!
updateTime
||
updateTime
->
type
!=
cJSON_String
)
{
mError
(
"acct:%s, failed to parse since updateTime not found"
,
pAcct
->
acct
);
goto
DECODE_ACCT_OVER
;
SSdbRawData
*
pRaw
=
mnodeAcctActionEncode
(
&
acctObj
);
if
(
pRaw
!=
NULL
)
{
code
=
sdbWrite
(
pRaw
);
}
else
{
code
=
terrno
;
}
pAcct
->
updateTime
=
atol
(
updateTime
->
valuestring
);
code
=
0
;
mTrace
(
"acct:%s, parse success"
,
pAcct
->
acct
);
DECODE_ACCT_OVER:
if
(
code
!=
0
)
{
free
(
pAcct
);
pAcct
=
NULL
;
}
return
pAcct
;
return
code
;
}
int32_t
mnodeInitAcct
()
{
sdbSetFp
(
MN_SDB_ACCT
,
MN_KEY_BINARY
,
mnodeCreateDefaultAcct
,
(
SdbEncodeFp
)
mnodeEncodeAcct
,
(
SdbDecodeFp
)(
mnodeDecodeAcct
),
sizeof
(
SAcctObj
));
SSdbDesc
desc
=
{.
sdbType
=
SDB_ACCT
,
.
keyType
=
SDB_KEY_BINARY
,
.
deployFp
=
(
SdbDeployFp
)
mnodeCreateDefaultAcct
,
.
encodeFp
=
(
SdbEncodeFp
)
mnodeAcctActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mnodeAcctActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mnodeAcctActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mnodeAcctActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mnodeAcctActionDelete
};
sdbSetHandler
(
desc
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnodeUser.c
浏览文件 @
79519165
...
...
@@ -19,111 +19,142 @@
#include "tglobal.h"
#include "mnodeInt.h"
static
int32_t
mnodeCreateDefaultUser
(
char
*
acct
,
char
*
user
,
char
*
pass
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
#define USER_VER 1
SUserObj
userObj
=
{
0
};
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
taosEncryptPass
((
uint8_t
*
)
pass
,
strlen
(
pass
),
userObj
.
pass
);
userObj
.
createdTime
=
taosGetTimestampMs
();
userObj
.
updateTime
=
taosGetTimestampMs
();
if
(
strcmp
(
user
,
TSDB_DEFAULT_USER
)
==
0
)
{
userObj
.
rootAuth
=
1
;
static
SSdbRawData
*
mnodeUserActionEncode
(
SUserObj
*
pUser
)
{
SSdbRawData
*
pRaw
=
calloc
(
1
,
sizeof
(
SUserObj
)
+
sizeof
(
SSdbRawData
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
sdbInsertRow
(
MN_SDB_USER
,
&
userObj
);
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
user
,
TSDB_USER_LEN
)
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
pass
,
TSDB_KEY_LEN
)
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
acct
,
TSDB_KEY_LEN
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
createdTime
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
updateTime
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pUser
->
rootAuth
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_USER
;
pRaw
->
sver
=
USER_VER
;
return
pRaw
;
}
static
void
mnodeCreateDefaultUsers
(
)
{
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
);
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"monitor"
,
tsInternalPass
)
;
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
tsInternalPass
)
;
}
static
SUserObj
*
mnodeUserActionDecode
(
SSdbRawData
*
pRaw
)
{
if
(
pRaw
->
sver
!=
USER_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
return
NULL
;
}
int32_t
mnodeEncodeUser
(
SUserObj
*
pUser
,
char
*
buf
,
int32_t
maxLen
)
{
int32_t
len
=
0
;
char
*
base64
=
base64_encode
((
const
unsigned
char
*
)
pUser
->
pass
,
TSDB_KEY_LEN
);
SUserObj
*
pUser
=
calloc
(
1
,
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"{
\"
type
\"
:%d, "
,
MN_SDB_USER
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
user
\"
:
\"
%s
\"
, "
,
pUser
->
user
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
auth
\"
:
\"
%24s
\"
, "
,
base64
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
acct
\"
:
\"
%s
\"
, "
,
pUser
->
acct
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
createdTime
\"
:
\"
%"
PRIu64
"
\"
, "
,
pUser
->
createdTime
);
len
+=
snprintf
(
buf
+
len
,
maxLen
-
len
,
"
\"
updateTime
\"
:
\"
%"
PRIu64
"
\"
}
\n
"
,
pUser
->
updateTime
);
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
user
,
TSDB_USER_LEN
,
code
)
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
pass
,
TSDB_KEY_LEN
,
code
)
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
acct
,
TSDB_USER_LEN
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
createdTime
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
updateTime
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pUser
->
rootAuth
,
code
)
free
(
base64
);
return
len
;
}
if
(
code
!=
0
)
{
tfree
(
pUser
);
terrno
=
code
;
return
NULL
;
}
SUserObj
*
mnodeDecodeUser
(
cJSON
*
root
)
{
int32_t
code
=
-
1
;
SUserObj
*
pUser
=
calloc
(
1
,
sizeof
(
SUserObj
));
return
pUser
;
}
cJSON
*
user
=
cJSON_GetObjectItem
(
root
,
"user"
);
if
(
!
user
||
user
->
type
!=
cJSON_String
)
{
mError
(
"failed to parse user since user not found"
);
goto
DECODE_USER_OVER
;
static
int32_t
mnodeUserActionInsert
(
SUserObj
*
pUser
)
{
pUser
->
prohibitDbHash
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
pUser
->
prohibitDbHash
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
tstrncpy
(
pUser
->
user
,
user
->
valuestring
,
TSDB_USER_LEN
);
if
(
strcmp
(
pUser
->
user
,
TSDB_DEFAULT_USER
)
==
0
)
{
pUser
->
rootAuth
=
1
;
pUser
->
pAcct
=
sdbAcquire
(
SDB_ACCT
,
pUser
->
acct
);
if
(
pUser
->
pAcct
==
NULL
)
{
return
TSDB_CODE_MND_ACCT_NOT_EXIST
;
}
cJSON
*
pass
=
cJSON_GetObjectItem
(
root
,
"auth"
);
if
(
!
pass
||
pass
->
type
!=
cJSON_String
)
{
mError
(
"user:%s, failed to parse since auth not found"
,
pUser
->
user
);
goto
DECODE_USER_OVER
;
}
return
0
;
}
int32_t
outlen
=
0
;
char
*
base64
=
(
char
*
)
base64_decode
(
pass
->
valuestring
,
strlen
(
pass
->
valuestring
),
&
outlen
);
if
(
outlen
!=
TSDB_KEY_LEN
)
{
mError
(
"user:%s, failed to parse since invalid auth format"
,
pUser
->
user
);
free
(
base64
);
goto
DECODE_USER_OVER
;
}
else
{
memcpy
(
pUser
->
pass
,
base64
,
outlen
);
free
(
base64
);
static
int32_t
mnodeUserActionDelete
(
SUserObj
*
pUser
)
{
if
(
pUser
->
prohibitDbHash
)
{
taosHashCleanup
(
pUser
->
prohibitDbHash
);
pUser
->
prohibitDbHash
=
NULL
;
}
cJSON
*
acct
=
cJSON_GetObjectItem
(
root
,
"acct"
);
if
(
!
acct
||
acct
->
type
!=
cJSON_String
)
{
mError
(
"user:%s, failed to parse since acct not found"
,
pUser
->
user
);
goto
DECODE_USER_OVER
;
if
(
pUser
->
acct
!=
NULL
)
{
sdbRelease
(
SDB_ACCT
,
pUser
->
pAcct
);
pUser
->
pAcct
=
NULL
;
}
tstrncpy
(
pUser
->
acct
,
acct
->
valuestring
,
TSDB_USER_LEN
);
cJSON
*
createdTime
=
cJSON_GetObjectItem
(
root
,
"createdTime"
);
if
(
!
createdTime
||
createdTime
->
type
!=
cJSON_String
)
{
mError
(
"user:%s, failed to parse since createdTime not found"
,
pUser
->
user
);
goto
DECODE_USER_OVER
;
return
0
;
}
static
int32_t
mnodeUserActionUpdate
(
SUserObj
*
pSrcUser
,
SUserObj
*
pDstUser
)
{
memcpy
(
pDstUser
,
pSrcUser
,
(
int32_t
)((
char
*
)
&
pDstUser
->
prohibitDbHash
-
(
char
*
)
&
pDstUser
));
return
0
;
}
static
int32_t
mnodeCreateDefaultUser
(
char
*
acct
,
char
*
user
,
char
*
pass
)
{
int32_t
code
=
0
;
SUserObj
userObj
=
{
0
};
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
taosEncryptPass
((
uint8_t
*
)
pass
,
strlen
(
pass
),
userObj
.
pass
);
userObj
.
createdTime
=
taosGetTimestampMs
();
userObj
.
updateTime
=
taosGetTimestampMs
();
if
(
strcmp
(
user
,
TSDB_DEFAULT_USER
)
==
0
)
{
userObj
.
rootAuth
=
1
;
}
pUser
->
createdTime
=
atol
(
createdTime
->
valuestring
);
cJSON
*
updateTime
=
cJSON_GetObjectItem
(
root
,
"updateTime"
);
if
(
!
updateTime
||
updateTime
->
type
!=
cJSON_String
)
{
mError
(
"user:%s, failed to parse since updateTime not found"
,
pUser
->
user
);
goto
DECODE_USER_OVER
;
SSdbRawData
*
pRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRaw
!=
NULL
)
{
code
=
sdbWrite
(
pRaw
);
}
else
{
code
=
terrno
;
}
pUser
->
updateTime
=
atol
(
updateTime
->
valuestring
);
code
=
0
;
mTrace
(
"user:%s, parse success"
,
pUser
->
user
);
return
code
;
}
DECODE_USER_OVER:
if
(
code
!=
0
)
{
free
(
pUser
);
pUser
=
NULL
;
}
return
pUser
;
static
int32_t
mnodeCreateDefaultUsers
()
{
int32_t
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
);
if
(
code
!=
0
)
return
code
;
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"monitor"
,
tsInternalPass
);
if
(
code
!=
0
)
return
code
;
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
tsInternalPass
);
if
(
code
!=
0
)
return
code
;
return
code
;
}
int32_t
mnodeInitUser
()
{
sdbSetFp
(
MN_SDB_USER
,
MN_KEY_BINARY
,
mnodeCreateDefaultUsers
,
(
SdbEncodeFp
)
mnodeEncodeUser
,
(
SdbDecodeFp
)(
mnodeDecodeUser
),
sizeof
(
SUserObj
));
SSdbDesc
desc
=
{.
sdbType
=
SDB_USER
,
.
keyType
=
SDB_KEY_BINARY
,
.
deployFp
=
(
SdbDeployFp
)
mnodeCreateDefaultUsers
,
.
encodeFp
=
(
SdbEncodeFp
)
mnodeUserActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mnodeUserActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mnodeUserActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mnodeUserActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mnodeUserActionDelete
};
sdbSetHandler
(
desc
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnodeWorker.c
浏览文件 @
79519165
...
...
@@ -50,7 +50,7 @@ static SMnMsg *mnodeInitMsg2(SRpcMsg *pRpcMsg) {
SRpcConnInfo
connInfo
=
{
0
};
if
(
rpcGetConnInfo
(
pMsg
->
rpcMsg
.
handle
,
&
connInfo
)
==
0
)
{
pMsg
->
pUser
=
sdb
GetRow
(
MN_
SDB_USER
,
connInfo
.
user
);
pMsg
->
pUser
=
sdb
Acquire
(
SDB_USER
,
connInfo
.
user
);
}
if
(
pMsg
->
pUser
==
NULL
)
{
...
...
@@ -77,7 +77,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
INVALID_USER
};
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
USER_NOT_EXIST
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
mTrace
(
"msg:%p, app:%p type:%s is put into wqueue"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
...
...
@@ -103,7 +103,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
INVALID_USER
};
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
USER_NOT_EXIST
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
mTrace
(
"msg:%p, app:%p type:%s is put into rqueue"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
...
...
@@ -120,7 +120,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
INVALID_USER
};
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
USER_NOT_EXIST
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
mTrace
(
"msg:%p, app:%p type:%s is put into peer req queue"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
...
...
@@ -135,7 +135,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
void
mnodeDispatchToPeerRspQueue
(
SRpcMsg
*
pRpcMsg
)
{
SMnMsg
*
pMsg
=
mnodeInitMsg2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
INVALID_USER
};
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_
USER_NOT_EXIST
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
mTrace
(
"msg:%p, app:%p type:%s is put into peer rsp queue"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
...
...
source/dnode/mnode/impl/src/mondeInt.c
浏览文件 @
79519165
...
...
@@ -161,7 +161,7 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
if
(
code
!=
0
)
{
mError
(
"failed to deploy mnode since init step1 error"
);
tsMint
.
state
=
MN_STATUS_UNINIT
;
return
TSDB_CODE_MND_
SDB
_ERROR
;
return
TSDB_CODE_MND_
APP
_ERROR
;
}
code
=
mnodeInitStep2
();
...
...
@@ -169,7 +169,7 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
mnodeCleanupStep1
();
mError
(
"failed to deploy mnode since init step2 error"
);
tsMint
.
state
=
MN_STATUS_UNINIT
;
return
TSDB_CODE_MND_
SDB
_ERROR
;
return
TSDB_CODE_MND_
APP
_ERROR
;
}
mDebug
(
"mnode is deployed and waiting for raft to confirm"
);
...
...
source/dnode/mnode/sdb/inc/sdbInt.h
浏览文件 @
79519165
...
...
@@ -17,17 +17,17 @@
#define _TD_SDB_INT_H_
#include "os.h"
#include "sdb.h"
#include "taosmsg.h"
#include "tlog.h"
#include "thash.h"
#include "tglobal.h"
#include "sdb.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#ifdef __cplusplus
extern
"C"
{
#endif
// mnode log function
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }}
...
...
@@ -35,6 +35,32 @@ extern "C" {
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define SDB_MAX_SIZE (32*1024)
typedef
struct
{
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
EKeyType
keyTypes
[
SDB_MAX
];
SHashObj
*
hashObjs
[
SDB_MAX
];
SRWLatch
locks
[
SDB_MAX
];
SdbInsertFp
insertFps
[
SDB_MAX
];
SdbUpdateFp
updateFps
[
SDB_MAX
];
SdbDeleteFp
deleteFps
[
SDB_MAX
];
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdbObj
;
typedef
struct
{
ESdbStatus
status
;
int32_t
refCount
;
int32_t
dataLen
;
char
*
data
[];
}
SSdbRow
;
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
79519165
...
...
@@ -14,21 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "sdbInt.h"
static
struct
{
char
currDir
[
PATH_MAX
];
char
backDir
[
PATH_MAX
];
char
tmpDir
[
PATH_MAX
];
int64_t
version
;
EMnKey
hashKey
[
MN_SDB_MAX
];
int32_t
dataSize
[
MN_SDB_MAX
];
SHashObj
*
hashObj
[
MN_SDB_MAX
];
SdbDeployFp
deployFp
[
MN_SDB_MAX
];
SdbEncodeFp
encodeFp
[
MN_SDB_MAX
];
SdbDecodeFp
decodeFp
[
MN_SDB_MAX
];
}
tsSdb
=
{
0
};
static
SSdbObj
tsSdb
=
{
0
};
static
int32_t
sdbCreateDir
()
{
if
(
!
taosMkDir
(
tsSdb
.
currDir
))
{
...
...
@@ -36,8 +24,8 @@ static int32_t sdbCreateDir() {
return
TAOS_SYSTEM_ERROR
(
errno
);
}
if
(
!
taosMkDir
(
tsSdb
.
back
Dir
))
{
mError
(
"failed to create dir:%s"
,
tsSdb
.
back
Dir
);
if
(
!
taosMkDir
(
tsSdb
.
sync
Dir
))
{
mError
(
"failed to create dir:%s"
,
tsSdb
.
sync
Dir
);
return
-
1
;
}
...
...
@@ -50,8 +38,8 @@ static int32_t sdbCreateDir() {
}
static
int32_t
sdbRunDeployFp
()
{
for
(
int32_t
i
=
MN_SDB_START
;
i
<
MN_
SDB_MAX
;
++
i
)
{
SdbDeployFp
fp
=
tsSdb
.
deployFp
[
i
];
for
(
int32_t
i
=
SDB_START
;
i
<
SDB_MAX
;
++
i
)
{
SdbDeployFp
fp
=
tsSdb
.
deployFp
s
[
i
];
if
(
fp
)
{
(
*
fp
)();
}
...
...
@@ -60,152 +48,147 @@ static int32_t sdbRunDeployFp() {
return
0
;
}
static
int32_t
sdbReadVersion
(
cJSON
*
root
)
{
cJSON
*
ver
=
cJSON_GetObjectItem
(
root
,
"version"
);
if
(
!
ver
||
ver
->
type
!=
cJSON_String
)
{
mError
(
"failed to parse version since version not found"
);
return
-
1
;
static
SHashObj
*
sdbGetHash
(
int32_t
sdb
)
{
if
(
sdb
>=
SDB_MAX
||
sdb
<=
SDB_START
)
{
return
NULL
;
}
tsSdb
.
version
=
(
int64_t
)
atoll
(
ver
->
valuestring
);
mTrace
(
"parse version success, version:%"
PRIu64
,
tsSdb
.
version
);
return
0
;
}
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
sdb
];
if
(
hash
==
NULL
)
{
return
NULL
;
}
static
void
sdbWriteVersion
(
FileFd
fd
)
{
char
content
[
128
];
int32_t
len
=
snprintf
(
content
,
sizeof
(
content
),
"{
\"
type
\"
:0,
\"
version
\"
:
\"
%"
PRIu64
"
\"
,
\"
updateTime
\"
:
\"
%"
PRIu64
"
\"
}
\n
"
,
tsSdb
.
version
,
taosGetTimestampMs
());
taosWriteFile
(
fd
,
content
,
len
);
return
hash
;
}
static
int32_t
sdbReadDataFile
()
{
ssize_t
_bytes
=
0
;
size_t
len
=
4096
;
char
*
line
=
calloc
(
1
,
len
);
int32_t
code
=
-
1
;
FILE
*
fp
=
NULL
;
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
fp
=
fopen
(
file
,
"r"
);
if
(
!
fp
)
{
mDebug
(
"failed to open file:%s for read since %s"
,
file
,
strerror
(
errno
));
goto
PARSE_SDB_DATA_ERROR
;
}
int32_t
sdbWrite
(
SSdbRawData
*
pRaw
)
{
SHashObj
*
hash
=
sdbGetHash
(
pRaw
->
type
);
switch
(
pRaw
->
action
)
{
case
SDB_ACTION_INSERT
:
break
;
case
SDB_ACTION_UPDATE
:
break
;
case
SDB_ACTION_DELETE
:
break
;
while
(
!
feof
(
fp
))
{
memset
(
line
,
0
,
len
);
_bytes
=
tgetline
(
&
line
,
&
len
,
fp
);
if
(
_bytes
<
0
)
{
default:
break
;
}
line
[
len
-
1
]
=
0
;
if
(
len
<=
10
)
continue
;
return
0
;
}
root
=
cJSON_Parse
(
line
);
if
(
root
==
NULL
)
{
mError
(
"failed to parse since invalid json format, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
}
static
int32_t
sdbWriteVersion
(
FileFd
fd
)
{
return
0
;
}
cJSON
*
type
=
cJSON_GetObjectItem
(
root
,
"type"
);
if
(
!
type
||
type
->
type
!=
cJSON_Number
)
{
mError
(
"failed to parse since invalid type not found, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
static
int32_t
sdbReadVersion
(
FileFd
fd
)
{
return
0
;
}
static
int32_t
sdbReadDataFile
()
{
int32_t
code
=
0
;
SSdbRawData
*
pRaw
=
malloc
(
SDB_MAX_SIZE
);
if
(
pRaw
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
if
(
type
->
valueint
>=
MN_SDB_MAX
||
type
->
valueint
<
MN_SDB_START
)
{
mError
(
"failed to parse since invalid type, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
char
file
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
file
);
if
(
fd
<=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for read since %s"
,
file
,
tstrerror
(
code
));
return
code
;
}
if
(
type
->
valueint
==
MN_SDB_START
)
{
if
(
sdbReadVersion
(
root
)
!=
0
)
{
mError
(
"failed to parse version, %s"
,
line
);
goto
PARSE_SDB_DATA_ERROR
;
int64_t
offset
=
0
;
while
(
1
)
{
int32_t
ret
=
(
int32_t
)
taosReadFile
(
fd
,
pRaw
,
sizeof
(
SSdbRawData
));
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
cJSON_Delete
(
root
);
root
=
NULL
;
continue
;
if
(
ret
<
sizeof
(
SSdbRawData
))
{
code
=
TSDB_CODE_SDB_INTERNAL_ERROR
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
SdbDecodeFp
decodeFp
=
tsSdb
.
decodeFp
[
type
->
valueint
];
SdbHead
*
pHead
=
(
*
decodeFp
)(
root
);
if
(
pHead
==
NULL
)
{
mError
(
"failed to parse since decode error, %s"
,
line
);
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
goto
PARSE_SDB_DATA_ERROR
;
}
pHead
->
type
=
type
->
valueint
;
pHead
->
status
=
MN_SDB_STAT_AVAIL
;
sdbInsertRow
(
pHead
->
type
,
pHead
);
free
(
pHead
);
cJSON_Delete
(
root
);
root
=
NULL
;
}
code
=
0
;
PARSE_SDB_DATA_ERROR:
if
(
line
)
free
(
line
);
if
(
fp
)
fclose
(
fp
);
if
(
root
)
cJSON_Delete
(
root
);
taosCloseFile
(
fd
);
return
code
;
}
static
int32_t
sdbWriteDataFile
()
{
char
file
[
PATH_MAX
+
20
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
file
);
int32_t
code
=
0
;
char
tmpfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
tmpfile
,
sizeof
(
tmpfile
),
"%ssdb.data"
,
tsSdb
.
tmpDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
tmpfile
);
if
(
fd
<=
0
)
{
mError
(
"failed to open file:%s for write since %s"
,
file
,
strerror
(
errno
));
return
-
1
;
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for write since %s"
,
tmpfile
,
tstrerror
(
code
));
return
code
;
}
int32_t
len
;
int32_t
maxLen
=
10240
;
char
*
buf
=
malloc
(
maxLen
);
for
(
int32_t
i
=
MN_SDB_START
;
i
<
MN_SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObj
[
i
];
for
(
int32_t
i
=
SDB_START
;
i
<
SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
if
(
!
hash
)
continue
;
SdbEncodeFp
encodeFp
=
tsSdb
.
encodeFp
[
i
];
SdbEncodeFp
encodeFp
=
tsSdb
.
encodeFp
s
[
i
];
if
(
!
encodeFp
)
continue
;
SdbHead
*
pHead
=
taosHashIterate
(
hash
,
NULL
);
while
(
pHead
!=
NULL
)
{
len
=
(
*
encodeFp
)(
pHead
,
buf
,
maxLen
);
if
(
len
>=
0
)
{
taosWriteFile
(
fd
,
buf
,
len
);
SSdbRow
*
pRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
pRow
!=
NULL
)
{
if
(
pRow
->
status
==
SDB_STATUS_READY
)
continue
;
SSdbRawData
*
pRaw
=
(
*
encodeFp
)(
pRow
->
data
);
if
(
pRaw
!=
NULL
)
{
taosWriteFile
(
fd
,
pRaw
,
sizeof
(
SSdbRawData
)
+
pRaw
->
dataLen
);
}
else
{
taosHashCancelIterate
(
hash
,
pRow
);
code
=
TSDB_CODE_SDB_INTERNAL_ERROR
;
break
;
}
pHead
=
taosHashIterate
(
hash
,
pHead
);
pRow
=
taosHashIterate
(
hash
,
pRow
);
}
}
if
(
code
==
0
)
{
code
=
sdbWriteVersion
(
fd
);
}
sdbWriteVersion
(
fd
);
taosFsyncFile
(
fd
);
taosCloseFile
(
fd
);
mInfo
(
"write file:%s successfully"
,
file
);
return
0
;
}
if
(
code
==
0
)
{
code
=
taosFsyncFile
(
fd
)
;
}
int32_t
sdbCommit
()
{
int32_t
code
=
sdbWriteDataFile
();
if
(
code
!=
0
)
{
return
code
;
char
curfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
curfile
,
sizeof
(
curfile
),
"%ssdb.data"
,
tsSdb
.
currDir
);
code
=
taosRenameFile
(
tmpfile
,
curfile
);
}
return
0
;
if
(
code
!=
0
)
{
mError
(
"failed to write sdb file since %s"
,
tstrerror
(
code
));
}
else
{
mInfo
(
"write sdb file successfully"
);
}
return
code
;
}
int32_t
sdbRead
()
{
...
...
@@ -218,6 +201,15 @@ int32_t sdbRead() {
return
-
1
;
}
int32_t
sdbCommit
()
{
int32_t
code
=
sdbWriteDataFile
();
if
(
code
!=
0
)
{
return
code
;
}
return
0
;
}
int32_t
sdbDeploy
()
{
if
(
sdbCreateDir
()
!=
0
)
{
return
-
1
;
...
...
@@ -231,38 +223,32 @@ int32_t sdbDeploy() {
return
-
1
;
}
// if (!taosMkDir())
// if (pMinfos == NULL) { // first deploy
// tsMint.dnodeId = 1;
// bool getuid = taosGetSystemUid(tsMint.clusterId);
// if (!getuid) {
// strcpy(tsMint.clusterId, "tdengine3.0");
// mError("deploy new mnode but failed to get uid, set to default val %s", tsMint.clusterId);
// } else {
// mDebug("deploy new mnode and uid is %s", tsMint.clusterId);
// }
// } else { // todo
// }
// if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) {
// mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno));
// return -1;
// }
return
0
;
}
void
sdbUnDeploy
()
{}
int32_t
sdbInit
()
{
snprintf
(
tsSdb
.
currDir
,
PATH_MAX
,
"%s%scurrent%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
snprintf
(
tsSdb
.
backDir
,
PATH_MAX
,
"%s%sbackup%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
snprintf
(
tsSdb
.
tmpDir
,
PATH_MAX
,
"%s%stmp%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
char
path
[
PATH_MAX
+
100
];
snprintf
(
path
,
PATH_MAX
+
100
,
"%s%scurrent%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
tsSdb
.
currDir
=
strdup
(
path
);
snprintf
(
path
,
PATH_MAX
+
100
,
"%s%ssync%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
tsSdb
.
syncDir
=
strdup
(
path
);
snprintf
(
path
,
PATH_MAX
+
100
,
"%s%stmp%s"
,
tsMnodeDir
,
TD_DIRSEP
,
TD_DIRSEP
);
tsSdb
.
tmpDir
=
strdup
(
path
);
for
(
int32_t
i
=
0
;
i
<
MN_SDB_MAX
;
++
i
)
{
if
(
tsSdb
.
currDir
==
NULL
||
tsSdb
.
currDir
==
NULL
||
tsSdb
.
currDir
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
SDB_MAX
;
++
i
)
{
int32_t
type
;
if
(
tsSdb
.
hashKey
[
i
]
==
MN
_KEY_INT32
)
{
if
(
tsSdb
.
keyTypes
[
i
]
==
SDB
_KEY_INT32
)
{
type
=
TSDB_DATA_TYPE_INT
;
}
else
if
(
tsSdb
.
hashKey
[
i
]
==
MN
_KEY_INT64
)
{
}
else
if
(
tsSdb
.
keyTypes
[
i
]
==
SDB
_KEY_INT64
)
{
type
=
TSDB_DATA_TYPE_BIGINT
;
}
else
{
type
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -270,55 +256,62 @@ int32_t sdbInit() {
SHashObj
*
hash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
type
),
true
,
HASH_NO_LOCK
);
if
(
hash
==
NULL
)
{
return
-
1
;
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
tsSdb
.
hashObj
[
i
]
=
hash
;
tsSdb
.
hashObjs
[
i
]
=
hash
;
taosInitRWLatch
(
&
tsSdb
.
locks
[
i
]);
}
return
0
;
}
void
sdbCleanup
()
{
for
(
int32_t
i
=
0
;
i
<
MN_SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObj
[
i
];
if
(
hash
!=
NULL
)
{
taosHashCleanup
(
hash
);
if
(
tsSdb
.
curVer
!=
tsSdb
.
lastCommitVer
)
{
sdbCommit
();
}
tsSdb
.
hashObj
[
i
]
=
NULL
;
if
(
tsSdb
.
currDir
!=
NULL
)
{
tfree
(
tsSdb
.
currDir
);
}
}
void
sdbSetFp
(
EMnSdb
sdb
,
EMnKey
keyType
,
SdbDeployFp
deployFp
,
SdbEncodeFp
encodeFp
,
SdbDecodeFp
decodeFp
,
int32_t
dataSize
)
{
tsSdb
.
deployFp
[
sdb
]
=
deployFp
;
tsSdb
.
encodeFp
[
sdb
]
=
encodeFp
;
tsSdb
.
decodeFp
[
sdb
]
=
decodeFp
;
tsSdb
.
dataSize
[
sdb
]
=
dataSize
;
tsSdb
.
hashKey
[
sdb
]
=
keyType
;
}
if
(
tsSdb
.
syncDir
!=
NULL
)
{
tfree
(
tsSdb
.
syncDir
);
}
static
SHashObj
*
sdbGetHash
(
int32_t
sdb
)
{
if
(
sdb
>=
MN_SDB_MAX
||
sdb
<=
MN_SDB_START
)
{
return
NULL
;
if
(
tsSdb
.
tmpDir
!=
NULL
)
{
tfree
(
tsSdb
.
tmpDir
);
}
SHashObj
*
hash
=
tsSdb
.
hashObj
[
sdb
];
if
(
hash
==
NULL
)
{
return
NULL
;
for
(
int32_t
i
=
0
;
i
<
SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
if
(
hash
!=
NULL
)
{
taosHashCleanup
(
hash
);
}
tsSdb
.
hashObjs
[
i
]
=
NULL
;
}
}
return
hash
;
void
sdbSetHandler
(
SSdbDesc
desc
)
{
ESdbType
sdb
=
desc
.
sdbType
;
tsSdb
.
keyTypes
[
sdb
]
=
desc
.
keyType
;
tsSdb
.
insertFps
[
sdb
]
=
desc
.
insertFp
;
tsSdb
.
updateFps
[
sdb
]
=
desc
.
updateFp
;
tsSdb
.
deleteFps
[
sdb
]
=
desc
.
deleteFp
;
tsSdb
.
deployFps
[
sdb
]
=
desc
.
deployFp
;
tsSdb
.
encodeFps
[
sdb
]
=
desc
.
encodeFp
;
tsSdb
.
decodeFps
[
sdb
]
=
desc
.
decodeFp
;
}
void
*
sdbInsertRow
(
EMnSdb
sdb
,
void
*
p
)
{
#if 0
void *sdbInsertRow(ESdbType sdb, void *p) {
SdbHead *pHead = p;
pHead->type = sdb;
pHead
->
status
=
MN_SDB_STAT
_AVAIL
;
pHead->status =
SDB
_AVAIL;
char *pKey = (char *)pHead + sizeof(pHead);
int32_t keySize;
E
MnKey
keyType
=
tsSdb
.
hashKey
[
pHead
->
type
];
E
KeyType keyType = tsSdb.keyTypes
[pHead->type];
int32_t dataSize = tsSdb.dataSize[pHead->type];
SHashObj *hash = sdbGetHash(pHead->type);
...
...
@@ -326,9 +319,9 @@ void *sdbInsertRow(EMnSdb sdb, void *p) {
return NULL;
}
if
(
keyType
==
MN_KEY_
INT32
)
{
if (keyType ==
SDB
INT32) {
keySize = sizeof(int32_t);
}
else
if
(
keyType
==
MN
_KEY_BINARY
)
{
} else if (keyType ==
SDB
_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
...
...
@@ -338,34 +331,58 @@ void *sdbInsertRow(EMnSdb sdb, void *p) {
return taosHashGet(hash, pKey, keySize);
}
void
sdbDeleteRow
(
E
MnSdb
sdb
,
void
*
p
)
{
void sdbDeleteRow(E
SdbType
sdb, void *p) {
SdbHead *pHead = p;
pHead
->
status
=
MN_SDB_STAT
_DROPPED
;
pHead->status =
SDB_STATUS
_DROPPED;
}
void
*
sdbUpdateRow
(
EMnSdb
sdb
,
void
*
pHead
)
{
return
sdbInsertRow
(
sdb
,
pHead
);
}
void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }
#endif
void
*
sdbAcquire
(
ESdbType
sdb
,
void
*
pKey
)
{
terrno
=
0
;
void
*
sdbGetRow
(
EMnSdb
sdb
,
void
*
pKey
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
int32_t
keySize
;
E
MnKey
keyType
=
tsSdb
.
hashKey
[
sdb
];
E
KeyType
keyType
=
tsSdb
.
keyTypes
[
sdb
];
if
(
keyType
==
MN_KEY_INT32
)
{
switch
(
keyType
)
{
case
SDB_KEY_INT32
:
keySize
=
sizeof
(
int32_t
);
}
else
if
(
keyType
==
MN_KEY_BINARY
)
{
break
;
case
SDB_KEY_INT64
:
keySize
=
sizeof
(
int64_t
);
break
;
case
SDB_KEY_BINARY
:
keySize
=
strlen
(
pKey
)
+
1
;
break
;
default:
keySize
=
sizeof
(
int32_t
);
}
SSdbRow
*
pRow
=
taosHashGet
(
hash
,
pKey
,
keySize
);
if
(
pRow
==
NULL
)
return
NULL
;
if
(
pRow
->
status
==
SDB_STATUS_READY
)
{
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
return
pRow
->
data
;
}
else
{
keySize
=
sizeof
(
int64_t
);
terrno
=
-
1
;
// todo
return
NULL
;
}
}
return
taosHashGet
(
hash
,
pKey
,
keySize
);
void
sdbRelease
(
ESdbType
sdb
,
void
*
pObj
)
{
SSdbRow
*
pRow
=
(
SSdbRow
*
)((
char
*
)
pObj
-
sizeof
(
SSdbRow
));
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
}
void
*
sdbFetchRow
(
E
MnSdb
sdb
,
void
*
pIter
)
{
void
*
sdbFetchRow
(
E
SdbType
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
...
...
@@ -374,16 +391,15 @@ void *sdbFetchRow(EMnSdb sdb, void *pIter) {
return
taosHashIterate
(
hash
,
pIter
);
}
void
sdbCancelFetch
(
E
MnSdb
sdb
,
void
*
pIter
)
{
void
sdbCancelFetch
(
E
SdbType
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
;
}
taosHashCancelIterate
(
hash
,
pIter
);
}
int32_t
sdbGet
Count
(
EMnSdb
sdb
)
{
int32_t
sdbGet
Size
(
ESdbType
sdb
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
0
;
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
79519165
...
...
@@ -1582,7 +1582,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_
INVALID_USER
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_
USER_NOT_EXIST
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return
0
;
...
...
source/util/src/terror.c
浏览文件 @
79519165
...
...
@@ -142,12 +142,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_START_SYNC, "failed to start sync"
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_FAILED_TO_CREATE_DIR
,
"failed to create mnode dir"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_FAILED_TO_INIT_STEP
,
"failed to init components"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE
,
"Object already there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_ERROR
,
"Unexpected generic error in sdb"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
,
"Invalid table type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
,
"Object not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_INVAID_META_ROW
,
"Invalid meta row"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_INVAID_KEY_TYPE
,
"Invalid key type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INTERNAL_ERROR
,
"Unexpected generic error in sdb"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OUT_OF_MEMORY
,
"Out of memory in sdb"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_ALREADY_THERE
,
"Object already there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_NOT_THERE
,
"Object not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
,
"Invalid raw data version"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVAID_RAW_DATA_LEN
,
"Invalid raw data len"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_TABLE_TYPE
,
"Invalid table type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVAID_META_ROW
,
"Invalid meta row"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVAID_KEY_TYPE
,
"Invalid key type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DNODE_ALREADY_EXIST
,
"DNode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DNODE_NOT_EXIST
,
"DNode does not exist"
)
...
...
@@ -166,12 +169,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, "Dnode Id not configur
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED
,
"Dnode Ep not configured"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACCT_ALREADY_EXIST
,
"Account already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
INVALID_ACCT
,
"Invalid account"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
ACCT_NOT_EXIST
,
"Invalid account"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_ACCT_OPTION
,
"Invalid account options"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACCT_EXPIRED
,
"Account authorization has expired"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_USER_ALREADY_EXIST
,
"User already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
INVALID_USER
,
"Invalid user"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
USER_NOT_EXIST
,
"Invalid user"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_USER_FORMAT
,
"Invalid user format"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_PASS_FORMAT
,
"Invalid password format"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NO_USER_FROM_CONN
,
"Can not get user from conn"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录