Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1cabd9f3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1cabd9f3
编写于
11月 15, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/vnode
上级
baac49c7
d2f99132
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
1078 addition
and
605 deletion
+1078
-605
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+3
-3
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+104
-64
include/util/taoserror.h
include/util/taoserror.h
+17
-15
source/dnode/mgmt/src/dnodeMnode.c
source/dnode/mgmt/src/dnodeMnode.c
+6
-6
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+5
-4
source/dnode/mnode/impl/src/mnodeAcct.c
source/dnode/mnode/impl/src/mnodeAcct.c
+45
-57
source/dnode/mnode/impl/src/mnodeSync.c
source/dnode/mnode/impl/src/mnodeSync.c
+2
-2
source/dnode/mnode/impl/src/mnodeUser.c
source/dnode/mnode/impl/src/mnodeUser.c
+37
-53
source/dnode/mnode/sdb/inc/sdbInt.h
source/dnode/mnode/sdb/inc/sdbInt.h
+19
-6
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+2
-327
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+258
-0
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+280
-0
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+196
-0
source/dnode/mnode/sdb/src/sdbRow.c
source/dnode/mnode/sdb/src/sdbRow.c
+38
-0
source/dnode/mnode/transaction/inc/trnInt.h
source/dnode/mnode/transaction/inc/trnInt.h
+9
-9
source/dnode/mnode/transaction/src/trn.c
source/dnode/mnode/transaction/src/trn.c
+52
-57
source/dnode/mnode/transaction/src/trnExec.c
source/dnode/mnode/transaction/src/trnExec.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+4
-1
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
1cabd9f3
...
...
@@ -55,9 +55,9 @@ typedef struct {
int32_t
mnodeInit
(
SMnodePara
para
);
void
mnodeCleanup
();
int32_t
mnodeDeploy
(
char
*
path
,
SMnodeCfg
*
pCfg
);
void
mnodeUnDeploy
(
char
*
path
);
int32_t
mnodeStart
(
char
*
path
,
SMnodeCfg
*
pCfg
);
int32_t
mnodeDeploy
(
SMnodeCfg
*
pCfg
);
void
mnodeUnDeploy
();
int32_t
mnodeStart
(
SMnodeCfg
*
pCfg
);
int32_t
mnodeAlter
(
SMnodeCfg
*
pCfg
);
void
mnodeStop
();
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
1cabd9f3
...
...
@@ -20,69 +20,105 @@
extern
"C"
{
#endif
#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \
if (code == 0) { \
if ((dataLen) >= (valLen)) { \
memcpy((val), (char *)(pData), (valLen)); \
(dataLen) -= (valLen); \
(pData) = (char *)(pData) + (valLen); \
} else { \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
#define SDB_GET_INT64(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += sizeof(int64_t); \
}
#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \
if (code == 0) { \
if (dataLen >= sizeof(int32_t)) { \
*(int32_t *)(pData) = (int32_t)(val); \
(dataLen) -= sizeof(int32_t); \
(pData) = (char *)(pData) + sizeof(int32_t); \
} else { \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
#define SDB_GET_INT32(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += sizeof(int32_t); \
}
#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \
if (code == 0) { \
if (dataLen >= sizeof(int64_t)) { \
*(int64_t *)(pData) = (int64_t)(val); \
(dataLen) -= sizeof(int64_t); \
(pData) = (char *)(pData) + sizeof(int64_t); \
} else { \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
#define SDB_GET_INT8(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += sizeof(int8_t); \
}
#define SDB_SET_INT64_VAL(pData, dataLen, val) \
{ \
*(int64_t *)(pData) = (int64_t)(val); \
(dataLen) += sizeof(int64_t); \
(pData) += sizeof(int64_t); \
#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \
{ \
if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += valLen; \
}
#define SDB_SET_INT32_VAL(pData, dataLen, val) \
{ \
*(int32_t *)(pData) = (int32_t)(val); \
(dataLen) += sizeof(int32_t); \
(pData) += sizeof(int32_t); \
#define SDB_SET_INT64(pData, dataPos, val) \
{ \
if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += sizeof(int64_t); \
}
#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \
{ \
memcpy((char *)(pData), (val), (valLen)); \
(dataLen) += (valLen); \
(pData) += (valLen); \
#define SDB_SET_INT32(pData, dataPos, val) \
{ \
if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += sizeof(int32_t); \
}
#define SDB_SET_INT8(pData, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += sizeof(int8_t); \
}
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
{ \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += valLen; \
}
#define SDB_SET_DATALEN(pRaw, dataLen) \
{ \
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
}
typedef
struct
SSdbRaw
SSdbRaw
;
typedef
struct
SSdbRow
SSdbRow
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_READY
=
2
,
SDB_STATUS_DROPPING
=
3
,
SDB_STATUS_DROPPED
=
4
}
ESdbStatus
;
typedef
enum
{
SDB_START
=
0
,
SDB_TRANS
=
1
,
SDB_CLUSTER
=
2
,
SDB_DNODE
=
3
,
SDB_MNODE
=
4
,
SDB_
ACCT
=
5
,
SDB_
USER
=
5
,
SDB_AUTH
=
6
,
SDB_
USER
=
7
,
SDB_
ACCT
=
7
,
SDB_DB
=
8
,
SDB_VGROUP
=
9
,
SDB_STABLE
=
10
,
...
...
@@ -90,26 +126,11 @@ typedef enum {
SDB_MAX
=
12
}
ESdbType
;
typedef
enum
{
SDB_ACTION_INSERT
=
1
,
SDB_ACTION_UPDATE
=
2
,
SDB_ACTION_DELETE
=
3
}
ESdbAction
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_READY
=
2
,
SDB_STATUS_DROPPING
=
3
}
ESdbStatus
;
typedef
struct
{
int8_t
type
;
int8_t
sver
;
int8_t
status
;
int8_t
action
;
int8_t
reserved
[
4
];
int32_t
cksum
;
int32_t
dataLen
;
char
data
[];
}
SSdbRaw
;
typedef
int32_t
(
*
SdbInsertFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbDeployFp
)();
typedef
void
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
struct
{
...
...
@@ -127,19 +148,38 @@ int32_t sdbInit();
void
sdbCleanup
();
void
sdbSetTable
(
SSdbTable
table
);
int32_t
sdbRead
();
int32_t
sdbOpen
();
void
sdbClose
();
int32_t
sdbWrite
(
SSdbRaw
*
pRaw
);
int32_t
sdbCommit
();
int32_t
sdbDeploy
();
void
sdbUnDeploy
();
void
*
sdbAcquire
(
ESdbType
sdb
,
void
*
pKey
);
void
sdbRelease
(
void
*
pObj
);
void
*
sdbFetch
(
ESdbType
sdb
,
void
*
pIter
);
void
sdbCancelFetch
(
ESdbType
sdb
,
void
*
pIter
);
void
*
sdbFetch
(
ESdbType
sdb
,
void
*
pIter
,
void
**
ppObj
);
void
sdbCancelFetch
(
void
*
pIter
);
int32_t
sdbGetSize
(
ESdbType
sdb
);
SSdbRaw
*
sdbAllocRaw
(
ESdbType
sdb
,
int8_t
sver
,
int32_t
dataLen
);
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
);
int32_t
sdbSetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
val
);
int32_t
sdbSetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
val
);
int32_t
sdbSetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
val
);
int32_t
sdbSetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
const
char
*
pVal
,
int32_t
valLen
);
int32_t
sdbSetRawDataLen
(
SSdbRaw
*
pRaw
,
int32_t
dataLen
);
int32_t
sdbSetRawStatus
(
SSdbRaw
*
pRaw
,
ESdbStatus
status
);
int32_t
sdbGetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
*
val
);
int32_t
sdbGetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
*
val
);
int32_t
sdbGetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
*
val
);
int32_t
sdbGetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
char
*
pVal
,
int32_t
valLen
);
int32_t
sdbGetRawSoftVer
(
SSdbRaw
*
pRaw
,
int8_t
*
sver
);
int32_t
sdbGetRawTotalSize
(
SSdbRaw
*
pRaw
);
SSdbRow
*
sdbAllocRow
(
int32_t
objSize
);
void
sdbFreeRow
(
SSdbRow
*
pRow
);
void
*
sdbGetRowObj
(
SSdbRow
*
pRow
);
#ifdef __cplusplus
}
#endif
...
...
include/util/taoserror.h
浏览文件 @
1cabd9f3
...
...
@@ -66,13 +66,14 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0103)
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010B)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010C)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D)
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010B)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010C)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010E)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
...
...
@@ -133,17 +134,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322)
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0323)
#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0324)
#define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325)
#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326)
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328)
#define TSDB_CODE_SDB_INVALID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0329)
#define TSDB_CODE_SDB_OBJ_CREATING TAOS_DEF_ERROR_CODE(0, 0x0323)
#define TSDB_CODE_SDB_OBJ_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0324)
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325)
#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326)
#define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0327)
#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0328)
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0329)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x032A)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x032B)
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists")
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist")
...
...
source/dnode/mgmt/src/dnodeMnode.c
浏览文件 @
1cabd9f3
...
...
@@ -136,8 +136,8 @@ static int32_t dnodeWriteMnodeFile() {
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
:
\"
%d
\"
,
\n
"
,
tsMnode
.
d
ropp
ed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsMnode
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
:
\"
%d
\"
,
\n
"
,
tsMnode
.
d
eploy
ed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"\n
"
,
tsMnode
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
...
...
@@ -180,7 +180,7 @@ static int32_t dnodeStartMnode() {
tsMnode
.
deployed
=
1
;
taosWUnLockLatch
(
&
tsMnode
.
latch
);
return
code
;
return
mnodeStart
(
NULL
)
;
}
static
void
dnodeStopMnode
()
{
...
...
@@ -212,14 +212,14 @@ static int32_t dnodeUnDeployMnode() {
}
dnodeStopMnode
();
mnodeUnDeploy
(
tsMnodeDir
);
mnodeUnDeploy
();
dnodeWriteMnodeFile
();
return
code
;
}
static
int32_t
dnodeDeployMnode
(
SMnodeCfg
*
pCfg
)
{
int32_t
code
=
mnodeDeploy
(
tsMnodeDir
,
pCfg
);
int32_t
code
=
mnodeDeploy
(
pCfg
);
if
(
code
!=
0
)
{
dError
(
"failed to deploy mnode since %s"
,
tstrerror
(
code
));
return
code
;
...
...
@@ -536,7 +536,7 @@ static int32_t dnodeOpenMnode() {
SMnodeCfg
cfg
=
{.
replica
=
1
};
cfg
.
replicas
[
0
].
port
=
tsServerPort
;
tstrncpy
(
cfg
.
replicas
[
0
].
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
return
dnodeDeployMnode
(
&
cfg
);
code
=
dnodeDeployMnode
(
&
cfg
);
}
else
{
dInfo
(
"start to open mnode"
);
return
dnodeStartMnode
();
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
1cabd9f3
...
...
@@ -134,10 +134,11 @@ static int32_t mnodeAllocInitSteps() {
}
static
int32_t
mnodeAllocStartSteps
()
{
struct
SSteps
*
steps
=
taosStepInit
(
7
,
NULL
);
struct
SSteps
*
steps
=
taosStepInit
(
8
,
NULL
);
if
(
steps
==
NULL
)
return
-
1
;
taosStepAdd
(
steps
,
"mnode-timer"
,
mnodeInitTimer
,
NULL
);
taosStepAdd
(
steps
,
"mnode-sdb-file"
,
sdbOpen
,
sdbClose
);
taosStepAdd
(
steps
,
"mnode-balance"
,
mnodeInitBalance
,
mnodeCleanupBalance
);
taosStepAdd
(
steps
,
"mnode-profile"
,
mnodeInitProfile
,
mnodeCleanupProfile
);
taosStepAdd
(
steps
,
"mnode-show"
,
mnodeInitShow
,
mnodeCleanUpShow
);
...
...
@@ -170,7 +171,7 @@ int32_t mnodeInit(SMnodePara para) {
void
mnodeCleanup
()
{
taosStepCleanup
(
tsMint
.
pInitSteps
);
}
int32_t
mnodeDeploy
(
char
*
path
,
SMnodeCfg
*
pCfg
)
{
int32_t
mnodeDeploy
(
SMnodeCfg
*
pCfg
)
{
if
(
tsMint
.
para
.
dnodeId
<=
0
&&
tsMint
.
para
.
clusterId
<=
0
)
{
if
(
sdbDeploy
()
!=
0
)
{
mError
(
"failed to deploy sdb since %s"
,
terrstr
());
...
...
@@ -182,9 +183,9 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
return
0
;
}
void
mnodeUnDeploy
(
char
*
path
)
{
sdbUnDeploy
();
}
void
mnodeUnDeploy
()
{
sdbUnDeploy
();
}
int32_t
mnodeStart
(
char
*
path
,
SMnodeCfg
*
pCfg
)
{
return
taosStepExec
(
tsMint
.
pStartSteps
);
}
int32_t
mnodeStart
(
SMnodeCfg
*
pCfg
)
{
return
taosStepExec
(
tsMint
.
pStartSteps
);
}
int32_t
mnodeAlter
(
SMnodeCfg
*
pCfg
)
{
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnodeAcct.c
浏览文件 @
1cabd9f3
...
...
@@ -16,69 +16,56 @@
#define _DEFAULT_SOURCE
#include "mnodeInt.h"
#define ACCT_VER 1
#define
SDB_
ACCT_VER 1
static
SSdbRaw
*
mnodeAcctActionEncode
(
SAcctObj
*
pAcct
)
{
SSdbRaw
*
pRaw
=
calloc
(
1
,
sizeof
(
SAcctObj
)
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_ACCT
,
SDB_ACCT_VER
,
sizeof
(
SAcctObj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAcct
->
acct
,
TSDB_USER_LEN
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pAcct
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pAcct
->
updateTime
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
acctId
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
status
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
maxUsers
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
maxDbs
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
maxTimeSeries
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
maxStreams
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
maxStorage
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAcct
->
cfg
.
accessState
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pAcct
->
acct
,
TSDB_USER_LEN
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
createdTime
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
updateTime
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
acctId
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
status
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxUsers
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxDbs
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxTimeSeries
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStreams
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStorage
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
accessState
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_ACCT
;
pRaw
->
sver
=
ACCT_VER
;
return
pRaw
;
}
static
SAcctObj
*
mnodeAcctActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
ACCT_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
static
SSdbRow
*
mnodeAcctActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
SAcctObj
*
pAcct
=
calloc
(
1
,
sizeof
(
SAcctObj
));
if
(
pAcct
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pAcct
->
acct
,
TSDB_USER_LEN
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
createdTime
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
updateTime
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
acctId
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
status
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxUsers
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxDbs
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxTimeSeries
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStreams
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
maxStorage
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pAcct
->
cfg
.
accessState
,
code
)
if
(
code
!=
0
)
{
tfree
(
pAcct
);
terrno
=
code
;
if
(
sver
!=
SDB_ACCT_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
return
pAcct
;
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SAcctObj
));
SAcctObj
*
pAcct
=
sdbGetRowObj
(
pRow
);
if
(
pAcct
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pAcct
->
acct
,
TSDB_USER_LEN
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
updateTime
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
acctId
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
status
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
maxUsers
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
maxDbs
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
maxTimeSeries
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
maxStreams
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
maxStorage
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pAcct
->
cfg
.
accessState
)
return
pRow
;
}
static
int32_t
mnodeAcctActionInsert
(
SAcctObj
*
pAcct
)
{
return
0
;
}
...
...
@@ -86,7 +73,9 @@ static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; }
static
int32_t
mnodeAcctActionDelete
(
SAcctObj
*
pAcct
)
{
return
0
;
}
static
int32_t
mnodeAcctActionUpdate
(
SAcctObj
*
pSrcAcct
,
SAcctObj
*
pDstAcct
)
{
memcpy
(
pDstAcct
,
pSrcAcct
,
(
int32_t
)((
char
*
)
&
pDstAcct
->
info
-
(
char
*
)
&
pDstAcct
));
SAcctObj
tObj
;
int32_t
len
=
(
int32_t
)((
int8_t
*
)
&
tObj
.
info
-
(
int8_t
*
)
&
tObj
);
memcpy
(
pDstAcct
,
pSrcAcct
,
len
);
return
0
;
}
...
...
@@ -106,9 +95,8 @@ static int32_t mnodeCreateDefaultAcct() {
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
SSdbRaw
*
pRaw
=
mnodeAcctActionEncode
(
&
acctObj
);
if
(
pRaw
==
NULL
)
{
return
-
1
;
}
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pRaw
);
}
...
...
source/dnode/mnode/impl/src/mnodeSync.c
浏览文件 @
1cabd9f3
...
...
@@ -21,8 +21,8 @@ int32_t mnodeInitSync() { return 0; }
void
mnodeCleanUpSync
()
{}
int32_t
mnodeSyncPropose
(
SSdbRaw
*
pRaw
,
void
*
pData
)
{
trnApply
(
p
Raw
,
pData
,
0
);
free
(
p
Raw
);
trnApply
(
p
Data
,
pData
,
0
);
free
(
p
Data
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnodeUser.c
浏览文件 @
1cabd9f3
...
...
@@ -19,59 +19,46 @@
#include "tglobal.h"
#include "tkey.h"
#define USER_VER 1
#define
SDB_
USER_VER 1
static
SSdbRaw
*
mnodeUserActionEncode
(
SUserObj
*
pUser
)
{
SSdbRaw
*
pRaw
=
calloc
(
1
,
sizeof
(
SUserObj
)
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
SDB_USER_VER
,
sizeof
(
SAcctObj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
pass
,
TSDB_KEY_LEN
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
updateTime
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pUser
->
rootAuth
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
user
,
TSDB_USER_LEN
)
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
pass
,
TSDB_KEY_LEN
)
SDB_SET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
acct
,
TSDB_KEY_LEN
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
createdTime
)
SDB_SET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
updateTime
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
pUser
->
rootAuth
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_USER
;
pRaw
->
sver
=
USER_VER
;
return
pRaw
;
}
static
SUserObj
*
mnodeUserActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
USER_VER
)
{
static
SSdbRow
*
mnodeUserActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
SDB_USER_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
SUserObj
*
pUser
=
calloc
(
1
,
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SUserObj
));
SUserObj
*
pUser
=
sdbGetRowObj
(
pRow
);
if
(
pUser
==
NULL
)
return
NULL
;
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
user
,
TSDB_USER_LEN
,
code
)
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
pass
,
TSDB_KEY_LEN
,
code
)
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pUser
->
acct
,
TSDB_USER_LEN
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
createdTime
,
code
)
SDB_GET_INT64_VAL
(
pData
,
dataLen
,
pUser
->
updateTime
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
pUser
->
rootAuth
,
code
)
int32_t
dataPos
=
0
;
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
)
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pUser
->
pass
,
TSDB_KEY_LEN
)
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pUser
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pUser
->
updateTime
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pUser
->
rootAuth
)
if
(
code
!=
0
)
{
tfree
(
pUser
);
terrno
=
code
;
return
NULL
;
}
return
pUser
;
return
pRow
;
}
static
int32_t
mnodeUserActionInsert
(
SUserObj
*
pUser
)
{
...
...
@@ -105,7 +92,9 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) {
}
static
int32_t
mnodeUserActionUpdate
(
SUserObj
*
pSrcUser
,
SUserObj
*
pDstUser
)
{
memcpy
(
pDstUser
,
pSrcUser
,
(
int32_t
)((
char
*
)
&
pDstUser
->
prohibitDbHash
-
(
char
*
)
&
pDstUser
));
SUserObj
tObj
;
int32_t
len
=
(
int32_t
)((
int8_t
*
)
tObj
.
prohibitDbHash
-
(
int8_t
*
)
&
tObj
);
memcpy
(
pDstUser
,
pSrcUser
,
len
);
return
0
;
}
...
...
@@ -122,9 +111,8 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
}
SSdbRaw
*
pRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRaw
==
NULL
)
{
return
-
1
;
}
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pRaw
);
}
...
...
@@ -156,32 +144,28 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
STrans
*
pTrans
=
trnCreate
(
TRN_POLICY_ROLLBACK
);
if
(
pTrans
==
NULL
)
return
-
1
;
trnSetRpcHandle
(
pTrans
,
pMsg
->
rpcMsg
.
handle
);
SSdbRaw
*
pRedoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRedoRaw
==
NULL
||
trnAppendRedoLog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
trnDrop
(
pTrans
);
return
-
1
;
}
pRedoRaw
->
status
=
SDB_STATUS_CREATING
;
pRedoRaw
->
action
=
SDB_ACTION_INSERT
;
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
);
SSdbRaw
*
pUndoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pUndoRaw
==
NULL
||
trnAppendUndoLog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
trnDrop
(
pTrans
);
return
-
1
;
}
pUndoRaw
->
status
=
SDB_STATUS_DROPPING
;
pUndoRaw
->
action
=
SDB_ACTION_DELETE
;
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pCommitRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pCommitRaw
==
NULL
||
trnAppendCommitLog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
trnDrop
(
pTrans
);
return
-
1
;
}
pCommitRaw
->
status
=
SDB_STATUS_READY
;
pCommitRaw
->
action
=
SDB_ACTION_UPDATE
;
trnSetRpcHandle
(
pTrans
,
pMsg
->
rpcMsg
.
handle
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
trnPrepare
(
pTrans
,
mnodeSyncPropose
)
!=
0
)
{
trnDrop
(
pTrans
);
...
...
source/dnode/mnode/sdb/inc/sdbInt.h
浏览文件 @
1cabd9f3
...
...
@@ -36,6 +36,22 @@ extern "C" {
#define SDB_MAX_SIZE (32 * 1024)
typedef
struct
SSdbRaw
{
int8_t
sdb
;
int8_t
sver
;
int8_t
status
;
int8_t
reserved
;
int32_t
dataLen
;
char
pData
[];
}
SSdbRaw
;
typedef
struct
SSdbRow
{
ESdbType
sdb
;
ESdbStatus
status
;
int32_t
refCount
;
char
pObj
[];
}
SSdbRow
;
typedef
struct
{
char
*
currDir
;
char
*
syncDir
;
...
...
@@ -53,12 +69,9 @@ typedef struct {
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdbMgr
;
typedef
struct
{
ESdbStatus
status
;
int32_t
refCount
;
int32_t
dataLen
;
char
pData
[];
}
SSdbRow
;
extern
SSdbMgr
tsSdb
;
int32_t
sdbWriteImp
(
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
1cabd9f3
...
...
@@ -17,224 +17,7 @@
#include "sdbInt.h"
#include "tglobal.h"
static
SSdbMgr
tsSdb
=
{
0
};
static
int32_t
sdbCreateDir
()
{
if
(
!
taosMkDir
(
tsSdb
.
currDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
currDir
,
terrstr
());
return
-
1
;
}
if
(
!
taosMkDir
(
tsSdb
.
syncDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
syncDir
,
terrstr
());
return
-
1
;
}
if
(
!
taosMkDir
(
tsSdb
.
tmpDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
tmpDir
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
sdbRunDeployFp
()
{
for
(
int32_t
i
=
SDB_START
;
i
<
SDB_MAX
;
++
i
)
{
SdbDeployFp
fp
=
tsSdb
.
deployFps
[
i
];
if
(
fp
==
NULL
)
continue
;
if
((
*
fp
)()
!=
0
)
{
mError
(
"failed to deploy sdb:%d since %s"
,
i
,
terrstr
());
return
-
1
;
}
}
return
0
;
}
static
SHashObj
*
sdbGetHash
(
int32_t
sdb
)
{
if
(
sdb
>=
SDB_MAX
||
sdb
<=
SDB_START
)
{
terrno
=
TSDB_CODE_SDB_INVALID_TABLE_TYPE
;
return
NULL
;
}
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
sdb
];
if
(
hash
==
NULL
)
{
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
return
NULL
;
}
return
hash
;
}
int32_t
sdbWrite
(
SSdbRaw
*
pRaw
)
{
SHashObj
*
hash
=
sdbGetHash
(
pRaw
->
type
);
switch
(
pRaw
->
action
)
{
case
SDB_ACTION_INSERT
:
break
;
case
SDB_ACTION_UPDATE
:
break
;
case
SDB_ACTION_DELETE
:
break
;
default:
break
;
}
return
0
;
}
static
int32_t
sdbWriteVersion
(
FileFd
fd
)
{
return
0
;
}
static
int32_t
sdbReadVersion
(
FileFd
fd
)
{
return
0
;
}
static
int32_t
sdbReadDataFile
()
{
int32_t
code
=
0
;
SSdbRaw
*
pRaw
=
malloc
(
SDB_MAX_SIZE
);
if
(
pRaw
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
char
file
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
file
);
if
(
fd
<=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for read since %s"
,
file
,
tstrerror
(
code
));
return
code
;
}
int64_t
offset
=
0
;
while
(
1
)
{
int32_t
ret
=
(
int32_t
)
taosReadFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
));
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
if
(
ret
<
sizeof
(
SSdbRaw
))
{
code
=
TSDB_CODE_SDB_APP_ERROR
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
goto
PARSE_SDB_DATA_ERROR
;
}
}
code
=
0
;
PARSE_SDB_DATA_ERROR:
taosCloseFile
(
fd
);
return
code
;
}
static
int32_t
sdbWriteDataFile
()
{
int32_t
code
=
0
;
char
tmpfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
tmpfile
,
sizeof
(
tmpfile
),
"%ssdb.data"
,
tsSdb
.
tmpDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
tmpfile
);
if
(
fd
<=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for write since %s"
,
tmpfile
,
tstrerror
(
code
));
return
code
;
}
for
(
int32_t
i
=
SDB_MAX
-
1
;
i
>
SDB_START
;
--
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
if
(
!
hash
)
continue
;
SdbEncodeFp
encodeFp
=
tsSdb
.
encodeFps
[
i
];
if
(
!
encodeFp
)
continue
;
SSdbRow
*
pRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
pRow
!=
NULL
)
{
if
(
pRow
->
status
==
SDB_STATUS_READY
)
continue
;
SSdbRaw
*
pRaw
=
(
*
encodeFp
)(
pRow
->
pData
);
if
(
pRaw
!=
NULL
)
{
taosWriteFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
);
}
else
{
taosHashCancelIterate
(
hash
,
pRow
);
code
=
TSDB_CODE_SDB_APP_ERROR
;
break
;
}
pRow
=
taosHashIterate
(
hash
,
pRow
);
}
}
if
(
code
==
0
)
{
code
=
sdbWriteVersion
(
fd
);
}
taosCloseFile
(
fd
);
if
(
code
==
0
)
{
code
=
taosFsyncFile
(
fd
);
}
if
(
code
!=
0
)
{
char
curfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
curfile
,
sizeof
(
curfile
),
"%ssdb.data"
,
tsSdb
.
currDir
);
code
=
taosRenameFile
(
tmpfile
,
curfile
);
}
if
(
code
!=
0
)
{
mError
(
"failed to write sdb file since %s"
,
tstrerror
(
code
));
}
else
{
mInfo
(
"write sdb file successfully"
);
}
return
code
;
}
int32_t
sdbRead
()
{
int32_t
code
=
sdbReadDataFile
();
if
(
code
!=
0
)
{
return
code
;
}
mInfo
(
"read sdb file successfully"
);
return
-
1
;
}
int32_t
sdbCommit
()
{
int32_t
code
=
sdbWriteDataFile
();
if
(
code
!=
0
)
{
return
code
;
}
return
0
;
}
int32_t
sdbDeploy
()
{
if
(
sdbCreateDir
()
!=
0
)
{
return
-
1
;
}
if
(
sdbRunDeployFp
()
!=
0
)
{
return
-
1
;
}
if
(
sdbCommit
()
!=
0
)
{
return
-
1
;
}
return
0
;
}
void
sdbUnDeploy
()
{}
SSdbMgr
tsSdb
=
{
0
};
int32_t
sdbInit
()
{
char
path
[
PATH_MAX
+
100
];
...
...
@@ -262,7 +45,7 @@ int32_t sdbInit() {
type
=
TSDB_DATA_TYPE_BINARY
;
}
SHashObj
*
hash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
type
),
true
,
HASH_NO_LOCK
);
SHashObj
*
hash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
type
),
true
,
HASH_NO_LOCK
);
if
(
hash
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -275,10 +58,6 @@ int32_t sdbInit() {
}
void
sdbCleanup
()
{
if
(
tsSdb
.
curVer
!=
tsSdb
.
lastCommitVer
)
{
sdbCommit
();
}
if
(
tsSdb
.
currDir
!=
NULL
)
{
tfree
(
tsSdb
.
currDir
);
}
...
...
@@ -309,108 +88,4 @@ void sdbSetTable(SSdbTable table) {
tsSdb
.
deployFps
[
sdb
]
=
table
.
deployFp
;
tsSdb
.
encodeFps
[
sdb
]
=
table
.
encodeFp
;
tsSdb
.
decodeFps
[
sdb
]
=
table
.
decodeFp
;
}
#if 0
void *sdbInsertRow(ESdbType sdb, void *p) {
SdbHead *pHead = p;
pHead->type = sdb;
pHead->status = SDB_AVAIL;
char *pKey = (char *)pHead + sizeof(pHead);
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[pHead->type];
int32_t dataSize = tsSdb.dataSize[pHead->type];
SHashObj *hash = sdbGetHash(pHead->type);
if (hash == NULL) {
return NULL;
}
if (keyType == SDBINT32) {
keySize = sizeof(int32_t);
} else if (keyType == SDB_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
}
taosHashPut(hash, pKey, keySize, pHead, dataSize);
return taosHashGet(hash, pKey, keySize);
}
void sdbDeleteRow(ESdbType sdb, void *p) {
SdbHead *pHead = p;
pHead->status = SDB_STATUS_DROPPED;
}
void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }
#endif
void
*
sdbAcquire
(
ESdbType
sdb
,
void
*
pKey
)
{
terrno
=
0
;
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
int32_t
keySize
;
EKeyType
keyType
=
tsSdb
.
keyTypes
[
sdb
];
switch
(
keyType
)
{
case
SDB_KEY_INT32
:
keySize
=
sizeof
(
int32_t
);
break
;
case
SDB_KEY_INT64
:
keySize
=
sizeof
(
int64_t
);
break
;
case
SDB_KEY_BINARY
:
keySize
=
strlen
(
pKey
)
+
1
;
break
;
default:
keySize
=
sizeof
(
int32_t
);
}
SSdbRow
*
pRow
=
taosHashGet
(
hash
,
pKey
,
keySize
);
if
(
pRow
==
NULL
)
return
NULL
;
if
(
pRow
->
status
==
SDB_STATUS_READY
)
{
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
return
pRow
->
pData
;
}
else
{
terrno
=
-
1
;
// todo
return
NULL
;
}
}
void
sdbRelease
(
void
*
pObj
)
{
SSdbRow
*
pRow
=
(
SSdbRow
*
)((
char
*
)
pObj
-
sizeof
(
SSdbRow
));
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
}
void
*
sdbFetchRow
(
ESdbType
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
NULL
;
}
return
taosHashIterate
(
hash
,
pIter
);
}
void
sdbCancelFetch
(
ESdbType
sdb
,
void
*
pIter
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
;
}
taosHashCancelIterate
(
hash
,
pIter
);
}
int32_t
sdbGetSize
(
ESdbType
sdb
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
{
return
0
;
}
return
taosHashGetSize
(
hash
);
}
\ No newline at end of file
source/dnode/mnode/sdb/src/sdbFile.c
0 → 100644
浏览文件 @
1cabd9f3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
#include "tglobal.h"
#include "tchecksum.h"
static
int32_t
sdbCreateDir
()
{
mDebug
(
"start to create mnode at %s"
,
tsMnodeDir
);
if
(
!
taosMkDir
(
tsSdb
.
currDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
currDir
,
terrstr
());
return
-
1
;
}
if
(
!
taosMkDir
(
tsSdb
.
syncDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
syncDir
,
terrstr
());
return
-
1
;
}
if
(
!
taosMkDir
(
tsSdb
.
tmpDir
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
tsSdb
.
tmpDir
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
sdbRunDeployFp
()
{
mDebug
(
"start to run deploy functions"
);
for
(
int32_t
i
=
SDB_START
;
i
<
SDB_MAX
;
++
i
)
{
SdbDeployFp
fp
=
tsSdb
.
deployFps
[
i
];
if
(
fp
==
NULL
)
continue
;
if
((
*
fp
)()
!=
0
)
{
mError
(
"failed to deploy sdb:%d since %s"
,
i
,
terrstr
());
return
-
1
;
}
}
return
0
;
}
static
int32_t
sdbReadDataFile
()
{
SSdbRaw
*
pRaw
=
malloc
(
SDB_MAX_SIZE
);
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
char
file
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%ssdb.data"
,
tsSdb
.
currDir
);
FileFd
fd
=
taosOpenFileRead
(
file
);
if
(
fd
<=
0
)
{
free
(
pRaw
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for read since %s"
,
file
,
terrstr
());
return
-
1
;
}
int64_t
offset
=
0
;
int32_t
code
=
0
;
int32_t
readLen
=
0
;
int64_t
ret
=
0
;
while
(
1
)
{
readLen
=
sizeof
(
SSdbRaw
);
ret
=
taosReadFile
(
fd
,
pRaw
,
readLen
);
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
if
(
ret
!=
readLen
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
readLen
=
pRaw
->
dataLen
+
sizeof
(
int32_t
);
ret
=
taosReadFile
(
fd
,
pRaw
->
pData
,
readLen
);
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
if
(
ret
!=
readLen
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
int32_t
totalLen
=
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
+
sizeof
(
int32_t
);
if
(
!
taosCheckChecksumWhole
((
const
uint8_t
*
)
pRaw
,
totalLen
)
!=
0
)
{
code
=
TSDB_CODE_CHECKSUM_ERROR
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
code
=
sdbWriteImp
(
pRaw
);
if
(
code
!=
0
)
{
mError
(
"failed to read file:%s since %s"
,
file
,
terrstr
());
goto
PARSE_SDB_DATA_ERROR
;
}
}
code
=
0
;
PARSE_SDB_DATA_ERROR:
taosCloseFile
(
fd
);
sdbFreeRaw
(
pRaw
);
terrno
=
code
;
return
code
;
}
static
int32_t
sdbWriteDataFile
()
{
char
tmpfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
tmpfile
,
sizeof
(
tmpfile
),
"%ssdb.data"
,
tsSdb
.
tmpDir
);
FileFd
fd
=
taosOpenFileCreateWrite
(
tmpfile
);
if
(
fd
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for write since %s"
,
tmpfile
,
terrstr
());
return
-
1
;
}
int32_t
code
=
0
;
for
(
int32_t
i
=
SDB_MAX
-
1
;
i
>
SDB_START
;
--
i
)
{
SdbEncodeFp
encodeFp
=
tsSdb
.
encodeFps
[
i
];
if
(
encodeFp
==
NULL
)
continue
;
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
i
];
taosWLockLatch
(
pLock
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
ppRow
!=
NULL
)
{
SSdbRow
*
pRow
=
*
ppRow
;
if
(
pRow
==
NULL
||
pRow
->
status
!=
SDB_STATUS_READY
)
{
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
continue
;
}
SSdbRaw
*
pRaw
=
(
*
encodeFp
)(
pRow
->
pObj
);
if
(
pRaw
!=
NULL
)
{
pRaw
->
status
=
pRow
->
status
;
int32_t
writeLen
=
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
;
if
(
taosWriteFile
(
fd
,
pRaw
,
writeLen
)
!=
writeLen
)
{
code
=
TAOS_SYSTEM_ERROR
(
terrno
);
taosHashCancelIterate
(
hash
,
ppRow
);
break
;
}
int32_t
cksum
=
taosCalcChecksum
(
0
,
(
const
uint8_t
*
)
pRaw
,
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
);
if
(
taosWriteFile
(
fd
,
&
cksum
,
sizeof
(
int32_t
))
!=
sizeof
(
int32_t
))
{
code
=
TAOS_SYSTEM_ERROR
(
terrno
);
taosHashCancelIterate
(
hash
,
ppRow
);
break
;
}
}
else
{
code
=
TSDB_CODE_SDB_APP_ERROR
;
taosHashCancelIterate
(
hash
,
ppRow
);
break
;
}
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
taosWUnLockLatch
(
pLock
);
}
if
(
code
==
0
)
{
code
=
taosFsyncFile
(
fd
);
}
taosCloseFile
(
fd
);
if
(
code
==
0
)
{
char
curfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
curfile
,
sizeof
(
curfile
),
"%ssdb.data"
,
tsSdb
.
currDir
);
code
=
taosRenameFile
(
tmpfile
,
curfile
);
}
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"failed to write sdb file since %s"
,
terrstr
());
}
else
{
mDebug
(
"write sdb file successfully"
);
}
return
code
;
}
int32_t
sdbOpen
()
{
mDebug
(
"start to read mnode file"
);
if
(
sdbReadDataFile
()
!=
0
)
{
return
-
1
;
}
return
0
;
}
void
sdbClose
()
{
if
(
tsSdb
.
curVer
!=
tsSdb
.
lastCommitVer
)
{
mDebug
(
"start to write mnode file"
);
sdbWriteDataFile
();
}
for
(
int32_t
i
=
0
;
i
<
SDB_MAX
;
++
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
if
(
hash
!=
NULL
)
{
taosHashClear
(
hash
);
}
}
}
int32_t
sdbDeploy
()
{
if
(
sdbCreateDir
()
!=
0
)
{
return
-
1
;
}
if
(
sdbRunDeployFp
()
!=
0
)
{
return
-
1
;
}
if
(
sdbWriteDataFile
()
!=
0
)
{
return
-
1
;
}
sdbClose
();
return
0
;
}
void
sdbUnDeploy
()
{
mDebug
(
"start to undeploy mnode"
);
taosRemoveDir
(
tsMnodeDir
);
}
source/dnode/mnode/sdb/src/sdbHash.c
0 → 100644
浏览文件 @
1cabd9f3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
#include "tglobal.h"
static
SHashObj
*
sdbGetHash
(
int32_t
sdb
)
{
if
(
sdb
>=
SDB_MAX
||
sdb
<=
SDB_START
)
{
terrno
=
TSDB_CODE_SDB_INVALID_TABLE_TYPE
;
return
NULL
;
}
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
sdb
];
if
(
hash
==
NULL
)
{
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
return
NULL
;
}
return
hash
;
}
static
int32_t
sdbGetkeySize
(
ESdbType
sdb
,
void
*
pKey
)
{
int32_t
keySize
;
EKeyType
keyType
=
tsSdb
.
keyTypes
[
sdb
];
if
(
keyType
==
SDB_KEY_INT32
)
{
keySize
=
sizeof
(
int32_t
);
}
else
if
(
keyType
==
SDB_KEY_BINARY
)
{
keySize
=
strlen
(
pKey
)
+
1
;
}
else
{
keySize
=
sizeof
(
int64_t
);
}
return
keySize
;
}
static
int32_t
sdbInsertRow
(
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pRow
,
int32_t
keySize
)
{
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
pRow
->
sdb
];
taosWLockLatch
(
pLock
);
SSdbRow
*
pDstRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
pDstRow
!=
NULL
)
{
terrno
=
TSDB_CODE_SDB_OBJ_ALREADY_THERE
;
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pRow
);
return
-
1
;
}
pRow
->
refCount
=
1
;
pRow
->
status
=
pRaw
->
status
;
if
(
taosHashPut
(
hash
,
pRow
->
pObj
,
keySize
,
&
pRow
,
sizeof
(
void
*
))
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pRow
);
return
-
1
;
}
taosWUnLockLatch
(
pLock
);
SdbInsertFp
insertFp
=
tsSdb
.
insertFps
[
pRow
->
sdb
];
if
(
insertFp
!=
NULL
)
{
if
((
*
insertFp
)(
pRow
->
pObj
)
!=
0
)
{
taosWLockLatch
(
pLock
);
taosHashRemove
(
hash
,
pRow
->
pObj
,
keySize
);
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pRow
);
return
-
1
;
}
}
return
0
;
}
static
int32_t
sdbUpdateRow
(
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pRow
,
int32_t
keySize
)
{
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
pRow
->
sdb
];
taosRLockLatch
(
pLock
);
SSdbRow
**
ppDstRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
ppDstRow
==
NULL
||
*
ppDstRow
==
NULL
)
{
taosRUnLockLatch
(
pLock
);
return
sdbInsertRow
(
hash
,
pRaw
,
pRow
,
keySize
);
}
SSdbRow
*
pDstRow
=
*
ppDstRow
;
pRow
->
status
=
pRaw
->
status
;
taosRUnLockLatch
(
pLock
);
SdbUpdateFp
updateFp
=
tsSdb
.
updateFps
[
pRow
->
sdb
];
if
(
updateFp
!=
NULL
)
{
(
*
updateFp
)(
pRow
->
pObj
,
pDstRow
->
pObj
);
}
sdbFreeRow
(
pRow
);
return
0
;
}
static
int32_t
sdbDeleteRow
(
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pRow
,
int32_t
keySize
)
{
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
pRow
->
sdb
];
taosWLockLatch
(
pLock
);
SSdbRow
**
ppDstRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
ppDstRow
==
NULL
||
*
ppDstRow
==
NULL
)
{
terrno
=
TSDB_CODE_SDB_OBJ_NOT_THERE
;
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pRow
);
return
-
1
;
}
SSdbRow
*
pDstRow
=
*
ppDstRow
;
pDstRow
->
status
=
pRaw
->
status
;
taosHashRemove
(
hash
,
pDstRow
->
pObj
,
keySize
);
taosWUnLockLatch
(
pLock
);
SdbDeleteFp
deleteFp
=
tsSdb
.
deleteFps
[
pDstRow
->
sdb
];
if
(
deleteFp
!=
NULL
)
{
(
void
)(
*
deleteFp
)(
pDstRow
->
pObj
);
}
sdbRelease
(
pDstRow
->
pObj
);
sdbFreeRow
(
pRow
);
return
0
;
}
int32_t
sdbWriteImp
(
SSdbRaw
*
pRaw
)
{
SHashObj
*
hash
=
sdbGetHash
(
pRaw
->
sdb
);
if
(
hash
==
NULL
)
return
-
1
;
SdbDecodeFp
decodeFp
=
tsSdb
.
decodeFps
[
pRaw
->
sdb
];
SSdbRow
*
pRow
=
(
*
decodeFp
)(
pRaw
);
if
(
pRow
==
NULL
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_CONTENT
;
return
-
1
;
}
pRow
->
sdb
=
pRaw
->
sdb
;
int32_t
keySize
=
sdbGetkeySize
(
pRow
->
sdb
,
pRow
->
pObj
);
int32_t
code
=
-
1
;
switch
(
pRaw
->
status
)
{
case
SDB_STATUS_CREATING
:
code
=
sdbInsertRow
(
hash
,
pRaw
,
pRow
,
keySize
);
break
;
case
SDB_STATUS_READY
:
case
SDB_STATUS_DROPPING
:
code
=
sdbUpdateRow
(
hash
,
pRaw
,
pRow
,
keySize
);
break
;
case
SDB_STATUS_DROPPED
:
code
=
sdbDeleteRow
(
hash
,
pRaw
,
pRow
,
keySize
);
break
;
default:
terrno
=
TSDB_CODE_SDB_INVALID_ACTION_TYPE
;
break
;
}
return
code
;
}
int32_t
sdbWrite
(
SSdbRaw
*
pRaw
)
{
int32_t
code
=
sdbWriteImp
(
pRaw
);
sdbFreeRaw
(
pRaw
);
return
code
;
}
void
*
sdbAcquire
(
ESdbType
sdb
,
void
*
pKey
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
return
NULL
;
void
*
pRet
=
NULL
;
int32_t
keySize
=
sdbGetkeySize
(
sdb
,
pKey
);
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
sdb
];
taosRLockLatch
(
pLock
);
SSdbRow
**
ppRow
=
taosHashGet
(
hash
,
pKey
,
keySize
);
if
(
ppRow
==
NULL
||
*
ppRow
==
NULL
)
{
terrno
=
TSDB_CODE_SDB_OBJ_NOT_THERE
;
taosRUnLockLatch
(
pLock
);
return
NULL
;
}
SSdbRow
*
pRow
=
*
ppRow
;
switch
(
pRow
->
status
)
{
case
SDB_STATUS_READY
:
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
pRet
=
pRow
->
pObj
;
break
;
case
SDB_STATUS_CREATING
:
terrno
=
TSDB_CODE_SDB_OBJ_CREATING
;
break
;
case
SDB_STATUS_DROPPING
:
terrno
=
TSDB_CODE_SDB_OBJ_DROPPING
;
break
;
default:
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
break
;
}
taosRUnLockLatch
(
pLock
);
return
pRet
;
}
void
sdbRelease
(
void
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSdbRow
*
pRow
=
(
SSdbRow
*
)((
char
*
)
pObj
-
sizeof
(
SSdbRow
));
if
(
pRow
->
sdb
>=
SDB_MAX
||
pRow
->
sdb
<=
SDB_START
)
return
;
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
pRow
->
sdb
];
taosRLockLatch
(
pLock
);
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
if
(
ref
<=
0
&&
pRow
->
status
==
SDB_STATUS_DROPPED
)
{
sdbFreeRow
(
pRow
);
}
taosRUnLockLatch
(
pLock
);
}
void
*
sdbFetch
(
ESdbType
sdb
,
void
*
pIter
,
void
**
ppObj
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
return
NULL
;
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
sdb
];
taosRLockLatch
(
pLock
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
while
(
ppRow
!=
NULL
)
{
SSdbRow
*
pRow
=
*
ppRow
;
if
(
pRow
==
NULL
||
pRow
->
status
!=
SDB_STATUS_READY
)
{
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
continue
;
}
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
*
ppObj
=
pRow
->
pObj
;
break
;
}
taosRUnLockLatch
(
pLock
);
return
ppRow
;
}
void
sdbCancelFetch
(
void
*
pIter
)
{
if
(
pIter
==
NULL
)
return
;
SSdbRow
*
pRow
=
*
(
SSdbRow
**
)
pIter
;
SHashObj
*
hash
=
sdbGetHash
(
pRow
->
sdb
);
if
(
hash
==
NULL
)
return
;
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
pRow
->
sdb
];
taosRLockLatch
(
pLock
);
taosHashCancelIterate
(
hash
,
pIter
);
taosRUnLockLatch
(
pLock
);
}
int32_t
sdbGetSize
(
ESdbType
sdb
)
{
SHashObj
*
hash
=
sdbGetHash
(
sdb
);
if
(
hash
==
NULL
)
return
0
;
SRWLatch
*
pLock
=
&
tsSdb
.
locks
[
sdb
];
taosRLockLatch
(
pLock
);
int32_t
size
=
taosHashGetSize
(
hash
);
taosRUnLockLatch
(
pLock
);
return
size
;
}
source/dnode/mnode/sdb/src/sdbRaw.c
0 → 100644
浏览文件 @
1cabd9f3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
SSdbRaw
*
sdbAllocRaw
(
ESdbType
sdb
,
int8_t
sver
,
int32_t
dataLen
)
{
SSdbRaw
*
pRaw
=
calloc
(
1
,
dataLen
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pRaw
->
sdb
=
sdb
;
pRaw
->
sver
=
sver
;
pRaw
->
dataLen
=
dataLen
;
return
pRaw
;
}
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
)
{
free
(
pRaw
);
}
int32_t
sdbSetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int8_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
(
int8_t
*
)(
pRaw
->
pData
+
dataPos
)
=
val
;
return
0
;
}
int32_t
sdbSetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int32_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
(
int32_t
*
)(
pRaw
->
pData
+
dataPos
)
=
val
;
return
0
;
}
int32_t
sdbSetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int64_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
(
int64_t
*
)(
pRaw
->
pData
+
dataPos
)
=
val
;
return
0
;
}
int32_t
sdbSetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
const
char
*
pVal
,
int32_t
valLen
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
valLen
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
memcpy
(
pRaw
->
pData
+
dataPos
,
pVal
,
valLen
);
return
0
;
}
int32_t
sdbSetRawDataLen
(
SSdbRaw
*
pRaw
,
int32_t
dataLen
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataLen
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
pRaw
->
dataLen
=
dataLen
;
return
0
;
}
int32_t
sdbSetRawStatus
(
SSdbRaw
*
pRaw
,
ESdbStatus
status
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
pRaw
->
status
=
status
;
return
0
;
}
int32_t
sdbGetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
*
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int8_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
val
=
*
(
int8_t
*
)(
pRaw
->
pData
+
dataPos
);
return
0
;
}
int32_t
sdbGetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
*
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int32_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
val
=
*
(
int32_t
*
)(
pRaw
->
pData
+
dataPos
);
return
0
;
}
int32_t
sdbGetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
*
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int64_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
val
=
*
(
int64_t
*
)(
pRaw
->
pData
+
dataPos
);
return
0
;
}
int32_t
sdbGetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
char
*
pVal
,
int32_t
valLen
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
valLen
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
memcpy
(
pVal
,
pRaw
->
pData
+
dataPos
,
valLen
);
return
0
;
}
int32_t
sdbGetRawSoftVer
(
SSdbRaw
*
pRaw
,
int8_t
*
sver
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
*
sver
=
pRaw
->
sver
;
return
0
;
}
int32_t
sdbGetRawTotalSize
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
return
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
;
}
\ No newline at end of file
source/dnode/mnode/sdb/src/sdbRow.c
0 → 100644
浏览文件 @
1cabd9f3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
SSdbRow
*
sdbAllocRow
(
int32_t
objSize
)
{
SSdbRow
*
pRow
=
calloc
(
1
,
objSize
+
sizeof
(
SSdbRow
));
if
(
pRow
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
return
pRow
;
}
void
*
sdbGetRowObj
(
SSdbRow
*
pRow
)
{
if
(
pRow
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
return
pRow
->
pObj
;
}
void
sdbFreeRow
(
SSdbRow
*
pRow
)
{
free
(
pRow
);
}
source/dnode/mnode/transaction/inc/trnInt.h
浏览文件 @
1cabd9f3
...
...
@@ -45,15 +45,15 @@ typedef enum {
}
ETrnStage
;
typedef
struct
STrans
{
int32_t
id
;
ETrnStage
stage
;
ETrnPolicy
policy
;
void
*
rpcHandle
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
commitLogs
;
SArray
*
redoActions
;
SArray
*
undoActions
;
int32_t
id
;
int8_t
stage
;
int8_t
policy
;
void
*
rpcHandle
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
commitLogs
;
SArray
*
redoActions
;
SArray
*
undoActions
;
}
STrans
;
SSdbRaw
*
trnActionEncode
(
STrans
*
pTrans
);
...
...
source/dnode/mnode/transaction/src/trn.c
浏览文件 @
1cabd9f3
...
...
@@ -16,8 +16,10 @@
#define _DEFAULT_SOURCE
#include "trnInt.h"
#define SDB_TRANS_VER 1
SSdbRaw
*
trnActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
5
*
sizeof
(
int32_t
);
int32_t
rawDataLen
=
10
*
sizeof
(
int32_t
);
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
...
...
@@ -25,91 +27,84 @@ SSdbRaw *trnActionEncode(STrans *pTrans) {
int32_t
undoActionNum
=
taosArrayGetSize
(
pTrans
->
undoActions
);
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SSdbRaw
*
pRaw
Data
=
taosArrayGet
(
pTrans
->
redoLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
SSdbRaw
*
pRaw
=
taosArrayGet
(
pTrans
->
redoLogs
,
index
);
rawDataLen
+=
sdbGetRawTotalSize
(
pRaw
);
}
for
(
int32_t
index
=
0
;
index
<
undoLogNum
;
++
index
)
{
SSdbRaw
*
pRaw
Data
=
taosArrayGet
(
pTrans
->
undoLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
SSdbRaw
*
pRaw
=
taosArrayGet
(
pTrans
->
undoLogs
,
index
);
rawDataLen
+=
sdbGetRawTotalSize
(
pRaw
);
}
for
(
int32_t
index
=
0
;
index
<
commitLogNum
;
++
index
)
{
SSdbRaw
*
pRaw
Data
=
taosArrayGet
(
pTrans
->
commitLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
SSdbRaw
*
pRaw
=
taosArrayGet
(
pTrans
->
commitLogs
,
index
);
rawDataLen
+=
sdbGetRawTotalSize
(
pRaw
);
}
SSdbRaw
*
pRaw
=
calloc
(
1
,
rawDataLen
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
SDB_TRANS_VER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pData
,
dataPos
,
pTrans
->
id
)
SDB_SET_INT8
(
pData
,
dataPos
,
pTrans
->
stage
)
SDB_SET_INT8
(
pData
,
dataPos
,
pTrans
->
policy
)
SDB_SET_INT32
(
pData
,
dataPos
,
redoLogNum
)
SDB_SET_INT32
(
pData
,
dataPos
,
undoLogNum
)
SDB_SET_INT32
(
pData
,
dataPos
,
commitLogNum
)
SDB_SET_INT32
(
pData
,
dataPos
,
redoActionNum
)
SDB_SET_INT32
(
pData
,
dataPos
,
undoActionNum
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_TRANS
;
pRaw
->
sver
=
TRN_VER
;
return
pRaw
;
}
STrans
*
trnActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
TRN_VER
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
SDB_TRANS_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
STrans
*
pTrans
=
NULL
;
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
STrans
));
STrans
*
pTrans
=
sdbGetRowObj
(
pRow
);
if
(
pTrans
==
NULL
)
return
NULL
;
int32_t
redoLogNum
=
0
;
int32_t
undoLogNum
=
0
;
int32_t
commitLogNum
=
0
;
int32_t
redoActionNum
=
0
;
int32_t
undoActionNum
=
0
;
SSdbRaw
*
pTmp
=
malloc
(
sizeof
(
SSdbRaw
));
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
,
code
)
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTrans
->
id
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pTrans
->
stage
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pTrans
->
policy
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
redoLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
undoLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
commitLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
redoActionNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
undoActionNum
)
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pTmp
,
sizeof
(
SSdbRaw
),
code
);
if
(
code
==
0
&&
pTmp
->
dataLen
>
0
)
{
SSdbRaw
*
pRead
=
malloc
(
sizeof
(
SSdbRaw
)
+
pTmp
->
dataLen
);
if
(
pRead
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
break
;
}
memcpy
(
pRead
,
pTmp
,
sizeof
(
SSdbRaw
));
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pRead
->
data
,
pRead
->
dataLen
,
code
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pRead
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
break
;
}
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
pData
);
if
(
ret
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
break
;
}
}
if
(
code
!=
0
)
{
trnDrop
(
pTrans
);
terrno
=
code
;
return
NULL
;
}
//
if (code != 0) {
//
trnDrop(pTrans);
//
terrno = code;
//
return NULL;
//
}
return
pTrans
;
}
...
...
source/dnode/mnode/transaction/src/trnExec.c
浏览文件 @
1cabd9f3
...
...
@@ -52,7 +52,7 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
return
0
;
}
if
(
sdbWrite
(
p
Raw
)
!=
0
)
{
if
(
sdbWrite
(
p
Data
)
!=
0
)
{
code
=
terrno
;
trnSendRpcRsp
(
pData
,
code
);
terrno
=
code
;
...
...
source/util/src/terror.c
浏览文件 @
1cabd9f3
...
...
@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range")
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_PTR
,
"Invalid pointer"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MEMORY_CORRUPTED
,
"Memory corrupted"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_FILE_CORRUPTED
,
"Data file corrupted"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CHECKSUM_ERROR
,
"Checksum error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_MSG
,
"Invalid config message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
...
...
@@ -147,13 +148,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_INIT_STEP, "failed to init compon
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_APP_ERROR
,
"Unexpected generic error in sdb"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_ALREADY_THERE
,
"Object already there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_NOT_THERE
,
"Object not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_CREATING
,
"Object is creating"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_OBJ_DROPPING
,
"Object is dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_TABLE_TYPE
,
"Invalid table type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_KEY_TYPE
,
"Invalid key type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_ACTION_TYPE
,
"Invalid action type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_STATUS_TYPE
,
"Invalid status type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_DATA_VER
,
"Invalid raw data version"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_DATA_LEN
,
"Invalid raw data len"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_
META_ROW
,
"Invalid meta row
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_INVALID_
DATA_CONTENT
,
"Invalid raw data content
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DNODE_ALREADY_EXIST
,
"DNode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DNODE_NOT_EXIST
,
"DNode does not exist"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录