Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8454979b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8454979b
编写于
12月 14, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 14, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9087 from taosdata/feature/dnode3
Feature/dnode3
上级
d5342f03
d9762010
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
400 addition
and
376 deletion
+400
-376
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+6
-53
source/dnode/mgmt/impl/test/dnode/dnode.cpp
source/dnode/mgmt/impl/test/dnode/dnode.cpp
+34
-34
source/dnode/mgmt/impl/test/sut/deploy.cpp
source/dnode/mgmt/impl/test/sut/deploy.cpp
+1
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-2
source/dnode/mnode/impl/inc/mndSync.h
source/dnode/mnode/impl/inc/mndSync.h
+1
-1
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-3
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+4
-36
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+2
-2
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+2
-2
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+7
-7
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+321
-180
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+5
-45
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+6
-5
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
8454979b
...
@@ -144,9 +144,10 @@ typedef struct SSdbRow SSdbRow;
...
@@ -144,9 +144,10 @@ typedef struct SSdbRow SSdbRow;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
typedef
enum
{
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_
READY
=
2
,
SDB_STATUS_
UPDATING
=
2
,
SDB_STATUS_DROPPING
=
3
,
SDB_STATUS_DROPPING
=
3
,
SDB_STATUS_DROPPED
=
4
SDB_STATUS_READY
=
4
,
SDB_STATUS_DROPPED
=
5
}
ESdbStatus
;
}
ESdbStatus
;
typedef
enum
{
typedef
enum
{
...
@@ -174,67 +175,19 @@ typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
...
@@ -174,67 +175,19 @@ typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
struct
{
typedef
struct
{
/**
ESdbType
sdbType
;
* @brief The sdb type of the table.
EKeyType
keyType
;
*
*/
ESdbType
sdbType
;
/**
* @brief The key type of the table.
*
*/
EKeyType
keyType
;
/**
* @brief The callback function when the table is first deployed.
*
*/
SdbDeployFp
deployFp
;
SdbDeployFp
deployFp
;
/**
* @brief Encode one row of the table into rawdata.
*
*/
SdbEncodeFp
encodeFp
;
SdbEncodeFp
encodeFp
;
/**
* @brief Decode one row of the table from rawdata.
*
*/
SdbDecodeFp
decodeFp
;
SdbDecodeFp
decodeFp
;
/**
* @brief The callback function when insert a row to sdb.
*
*/
SdbInsertFp
insertFp
;
SdbInsertFp
insertFp
;
/**
* @brief The callback function when undate a row in sdb.
*
*/
SdbUpdateFp
updateFp
;
SdbUpdateFp
updateFp
;
/**
* @brief The callback function when delete a row from sdb.
*
*/
SdbDeleteFp
deleteFp
;
SdbDeleteFp
deleteFp
;
}
SSdbTable
;
}
SSdbTable
;
typedef
struct
SSdbOpt
{
typedef
struct
SSdbOpt
{
/**
* @brief The path of the sdb file.
*
*/
const
char
*
path
;
const
char
*
path
;
SMnode
*
pMnode
;
/**
* @brief The mnode object.
*
*/
SMnode
*
pMnode
;
}
SSdbOpt
;
}
SSdbOpt
;
/**
/**
...
...
source/dnode/mgmt/impl/test/dnode/dnode.cpp
浏览文件 @
8454979b
...
@@ -379,41 +379,41 @@ TEST_F(DndTestDnode, RestartDnode_01) {
...
@@ -379,41 +379,41 @@ TEST_F(DndTestDnode, RestartDnode_01) {
const
char
*
fqdn
=
"localhost"
;
const
char
*
fqdn
=
"localhost"
;
const
char
*
firstEp
=
"localhost:9521"
;
const
char
*
firstEp
=
"localhost:9521"
;
pServer1
=
startServer
(
"/tmp/dndTestDnode1"
,
fqdn
,
9521
,
firstEp
);
pServer1
=
startServer
(
"/tmp/dndTestDnode1"
,
fqdn
,
9521
,
firstEp
);
// pServer1
= startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
pServer3
=
startServer
(
"/tmp/dndTestDnode3"
,
fqdn
,
9523
,
firstEp
);
// pServer1
= startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
pServer4
=
startServer
(
"/tmp/dndTestDnode4"
,
fqdn
,
9524
,
firstEp
);
// pServer1
= startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
pServer5
=
startServer
(
"/tmp/dndTestDnode5"
,
fqdn
,
9525
,
firstEp
);
uInfo
(
"all server is running"
);
uInfo
(
"all server is running"
);
//
taosMsleep(1300);
taosMsleep
(
1300
);
//
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendTheCheckShowMetaMsg
(
TSDB_MGMT_TABLE_DNODE
,
"show dnodes"
,
7
);
//
SendThenCheckShowRetrieveMsg(4);
SendThenCheckShowRetrieveMsg
(
4
);
//
CheckInt16(1);
CheckInt16
(
1
);
//
CheckInt16(3);
CheckInt16
(
3
);
//
CheckInt16(4);
CheckInt16
(
4
);
//
CheckInt16(5);
CheckInt16
(
5
);
//
CheckBinary("localhost:9521", TSDB_EP_LEN);
CheckBinary
(
"localhost:9521"
,
TSDB_EP_LEN
);
//
CheckBinary("localhost:9523", TSDB_EP_LEN);
CheckBinary
(
"localhost:9523"
,
TSDB_EP_LEN
);
//
CheckBinary("localhost:9524", TSDB_EP_LEN);
CheckBinary
(
"localhost:9524"
,
TSDB_EP_LEN
);
//
CheckBinary("localhost:9525", TSDB_EP_LEN);
CheckBinary
(
"localhost:9525"
,
TSDB_EP_LEN
);
//
CheckInt16(0);
CheckInt16
(
0
);
//
CheckInt16(0);
CheckInt16
(
0
);
//
CheckInt16(0);
CheckInt16
(
0
);
//
CheckInt16(0);
CheckInt16
(
0
);
//
CheckInt16(1);
CheckInt16
(
1
);
//
CheckInt16(1);
CheckInt16
(
1
);
//
CheckInt16(1);
CheckInt16
(
1
);
//
CheckInt16(1);
CheckInt16
(
1
);
//
CheckBinary("ready", 10);
CheckBinary
(
"ready"
,
10
);
//
CheckBinary("ready", 10);
CheckBinary
(
"ready"
,
10
);
//
CheckBinary("ready", 10);
CheckBinary
(
"ready"
,
10
);
//
CheckBinary("ready", 10);
CheckBinary
(
"ready"
,
10
);
//
CheckTimestamp();
CheckTimestamp
();
//
CheckTimestamp();
CheckTimestamp
();
//
CheckTimestamp();
CheckTimestamp
();
//
CheckTimestamp();
CheckTimestamp
();
//
CheckBinary("", 24);
CheckBinary
(
""
,
24
);
//
CheckBinary("", 24);
CheckBinary
(
""
,
24
);
//
CheckBinary("", 24);
CheckBinary
(
""
,
24
);
//
CheckBinary("", 24);
CheckBinary
(
""
,
24
);
}
}
source/dnode/mgmt/impl/test/sut/deploy.cpp
浏览文件 @
8454979b
...
@@ -18,7 +18,7 @@
...
@@ -18,7 +18,7 @@
void
initLog
(
const
char
*
path
)
{
void
initLog
(
const
char
*
path
)
{
dDebugFlag
=
143
;
dDebugFlag
=
143
;
vDebugFlag
=
0
;
vDebugFlag
=
0
;
mDebugFlag
=
143
;
mDebugFlag
=
207
;
cDebugFlag
=
0
;
cDebugFlag
=
0
;
jniDebugFlag
=
0
;
jniDebugFlag
=
0
;
tmrDebugFlag
=
0
;
tmrDebugFlag
=
0
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8454979b
...
@@ -73,7 +73,8 @@ typedef enum {
...
@@ -73,7 +73,8 @@ typedef enum {
TRN_STAGE_EXECUTE
=
2
,
TRN_STAGE_EXECUTE
=
2
,
TRN_STAGE_COMMIT
=
3
,
TRN_STAGE_COMMIT
=
3
,
TRN_STAGE_ROLLBACK
=
4
,
TRN_STAGE_ROLLBACK
=
4
,
TRN_STAGE_RETRY
=
5
TRN_STAGE_RETRY
=
5
,
TRN_STAGE_OVER
=
6
,
}
ETrnStage
;
}
ETrnStage
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
1
,
TRN_POLICY_RETRY
=
2
}
ETrnPolicy
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
1
,
TRN_POLICY_RETRY
=
2
}
ETrnPolicy
;
...
@@ -103,7 +104,6 @@ typedef struct STrans {
...
@@ -103,7 +104,6 @@ typedef struct STrans {
int32_t
id
;
int32_t
id
;
ETrnStage
stage
;
ETrnStage
stage
;
ETrnPolicy
policy
;
ETrnPolicy
policy
;
SMnode
*
pMnode
;
void
*
rpcHandle
;
void
*
rpcHandle
;
SArray
*
redoLogs
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
undoLogs
;
...
@@ -304,6 +304,7 @@ typedef struct SMnodeMsg {
...
@@ -304,6 +304,7 @@ typedef struct SMnodeMsg {
typedef
struct
{
typedef
struct
{
int32_t
id
;
int32_t
id
;
int32_t
code
;
void
*
rpcHandle
;
void
*
rpcHandle
;
}
STransMsg
;
}
STransMsg
;
...
...
source/dnode/mnode/impl/inc/mndSync.h
浏览文件 @
8454979b
...
@@ -25,7 +25,7 @@ extern "C" {
...
@@ -25,7 +25,7 @@ extern "C" {
int32_t
mndInitSync
(
SMnode
*
pMnode
);
int32_t
mndInitSync
(
SMnode
*
pMnode
);
void
mndCleanupSync
(
SMnode
*
pMnode
);
void
mndCleanupSync
(
SMnode
*
pMnode
);
bool
mndIsMaster
(
SMnode
*
pMnode
);
bool
mndIsMaster
(
SMnode
*
pMnode
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
8454979b
...
@@ -32,10 +32,10 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
...
@@ -32,10 +32,10 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
STrans
*
pTrans
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
int32_t
mndTransExecute
(
SSdb
*
pSdb
,
int32_t
tranId
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
8454979b
...
@@ -357,7 +357,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
...
@@ -357,7 +357,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -491,7 +491,7 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
...
@@ -491,7 +491,7 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
}
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -571,7 +571,7 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
...
@@ -571,7 +571,7 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
8454979b
...
@@ -413,25 +413,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
...
@@ -413,25 +413,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_
CREATING
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_
READY
);
SSdbRaw
*
pUndoRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pCommitRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -485,25 +469,9 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
...
@@ -485,25 +469,9 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pUndoRaw
=
mndDnodeActionEncode
(
pDnode
);
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pCommitRaw
=
mndDnodeActionEncode
(
pDnode
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
8454979b
...
@@ -183,7 +183,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
...
@@ -183,7 +183,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -226,7 +226,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
...
@@ -226,7 +226,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
8454979b
...
@@ -238,7 +238,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
...
@@ -238,7 +238,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -313,7 +313,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
...
@@ -313,7 +313,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
8454979b
...
@@ -285,7 +285,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
...
@@ -285,7 +285,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -433,7 +433,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
...
@@ -433,7 +433,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
}
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
8454979b
...
@@ -21,16 +21,16 @@
...
@@ -21,16 +21,16 @@
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
return
0
;
}
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{}
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
)
{
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
len
=
sdbGetRawTotalSize
(
pRaw
);
//
int32_t len = sdbGetRawTotalSize(pRaw);
SSdbRaw
*
pReceived
=
calloc
(
1
,
len
);
//
SSdbRaw *pReceived = calloc(1, len);
memcpy
(
pReceived
,
pRaw
,
len
);
//
memcpy(pReceived, pRaw, len);
mDebug
(
"trans:%d, data:%p recv from sync, code:0x%x pMsg:%p"
,
pMsg
->
id
,
pReceived
,
code
&
0xFFFF
,
pMsg
);
//
mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
mndTransApply
(
pMnode
,
pReceived
,
pMsg
,
code
);
// mndTransApply(pMnode, pReceived
, code);
return
0
;
return
code
;
}
}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
return
true
;
}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
return
true
;
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
8454979b
...
@@ -17,8 +17,9 @@
...
@@ -17,8 +17,9 @@
#include "mndTrans.h"
#include "mndTrans.h"
#include "mndSync.h"
#include "mndSync.h"
#define SDB_TRANS_VER 1
#define TSDB_TRANS_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
...
@@ -26,6 +27,22 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
...
@@ -26,6 +27,22 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOldTrans
);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOldTrans
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
int32_t
mndTransAppendArray
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
void
mndTransDropArray
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformExecuteStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndInitTrans
(
SMnode
*
pMnode
)
{
int32_t
mndInitTrans
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_TRANS
,
SSdbTable
table
=
{.
sdbType
=
SDB_TRANS
,
.
keyType
=
SDB_KEY_INT32
,
.
keyType
=
SDB_KEY_INT32
,
...
@@ -41,7 +58,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
...
@@ -41,7 +58,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
16
*
sizeof
(
int32_t
);
int32_t
rawDataLen
=
16
*
sizeof
(
int32_t
)
+
TSDB_TRN_RESERVE_SIZE
;
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
...
@@ -63,7 +80,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
...
@@ -63,7 +80,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
}
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
SDB_TRANS_VER
,
rawDataLen
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
T
SDB_TRANS_VER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
{
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
return
NULL
;
return
NULL
;
...
@@ -71,7 +88,6 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
...
@@ -71,7 +88,6 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t
dataPos
=
0
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
id
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
id
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTrans
->
stage
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTrans
->
policy
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTrans
->
policy
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
redoLogNum
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
redoLogNum
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
undoLogNum
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
undoLogNum
)
...
@@ -100,6 +116,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
...
@@ -100,6 +116,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
}
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
return
pRaw
;
return
pRaw
;
}
}
...
@@ -113,7 +131,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -113,7 +131,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return
NULL
;
return
NULL
;
}
}
if
(
sver
!=
SDB_TRANS_VER
)
{
if
(
sver
!=
T
SDB_TRANS_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to get check soft ver from raw:%p since %s"
,
pRaw
,
terrstr
());
mError
(
"failed to get check soft ver from raw:%p since %s"
,
pRaw
,
terrstr
());
return
NULL
;
return
NULL
;
...
@@ -126,11 +144,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -126,11 +144,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return
NULL
;
return
NULL
;
}
}
pTrans
->
redoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
@@ -147,7 +165,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -147,7 +165,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t
dataPos
=
0
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTrans
->
id
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTrans
->
id
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
(
int8_t
*
)
&
pTrans
->
stage
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
(
int8_t
*
)
&
pTrans
->
policy
)
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
(
int8_t
*
)
&
pTrans
->
policy
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
redoLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
redoLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
undoLogNum
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
undoLogNum
)
...
@@ -197,6 +214,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -197,6 +214,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
}
}
}
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
TRANS_DECODE_OVER:
TRANS_DECODE_OVER:
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to parse from raw:%p since %s"
,
pTrans
->
id
,
pRaw
,
tstrerror
(
errno
));
mError
(
"trans:%d, failed to parse from raw:%p since %s"
,
pTrans
->
id
,
pRaw
,
tstrerror
(
errno
));
...
@@ -210,64 +229,72 @@ TRANS_DECODE_OVER:
...
@@ -210,64 +229,72 @@ TRANS_DECODE_OVER:
}
}
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform insert action, stage:%d"
,
pTrans
->
id
,
pTrans
->
stage
);
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
mTrace
(
"trans:%d, perform insert action, stage:%s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
));
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
return
0
;
return
0
;
}
}
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform delete action, stage:%
d"
,
pTrans
->
id
,
pTrans
->
stage
);
mTrace
(
"trans:%d, perform delete action, stage:%
s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
)
);
SArray
*
pArray
=
pTrans
->
undoLogs
;
mndTransDropArray
(
pTrans
->
redoLogs
);
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
mndTransDropArray
(
pTrans
->
undoLogs
);
mndTransDropArray
(
pTrans
->
commitLogs
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
mndTransDropArray
(
pTrans
->
redoActions
);
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
mndTransDropArray
(
pTrans
->
undoActions
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
return
0
;
return
0
;
}
}
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
pOldTrans
,
STrans
*
pNewTrans
)
{
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
pOldTrans
,
STrans
*
pNewTrans
)
{
mTrace
(
"trans:%d, perform update action, stage:%d"
,
pOldTrans
->
id
,
pNewTrans
->
stage
);
mTrace
(
"trans:%d, perform update action, stage:%s"
,
pOldTrans
->
id
,
mndTransStageStr
(
pNewTrans
->
stage
));
SArray
*
pArray
=
pOldTrans
->
commitLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pOldTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
pOldTrans
->
stage
=
pNewTrans
->
stage
;
pOldTrans
->
stage
=
pNewTrans
->
stage
;
return
0
;
return
0
;
}
}
static
int32_t
trnGenerateTransId
()
{
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
transId
);
}
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pTrans
);
}
static
int32_t
mndGenerateTransId
()
{
static
int32_t
tmp
=
0
;
static
int32_t
tmp
=
0
;
return
++
tmp
;
return
++
tmp
;
}
}
char
*
mndTransStageStr
(
ETrnStage
stage
)
{
switch
(
stage
)
{
case
TRN_STAGE_PREPARE
:
return
"prepare"
;
case
TRN_STAGE_EXECUTE
:
return
"execute"
;
case
TRN_STAGE_COMMIT
:
return
"commit"
;
case
TRN_STAGE_ROLLBACK
:
return
"rollback"
;
case
TRN_STAGE_RETRY
:
return
"retry"
;
default:
return
"undefined"
;
}
}
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
)
{
switch
(
policy
)
{
case
TRN_POLICY_ROLLBACK
:
return
"prepare"
;
case
TRN_POLICY_RETRY
:
return
"retry"
;
default:
return
"undefined"
;
}
}
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
void
*
rpcHandle
)
{
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
void
*
rpcHandle
)
{
STrans
*
pTrans
=
calloc
(
1
,
sizeof
(
STrans
));
STrans
*
pTrans
=
calloc
(
1
,
sizeof
(
STrans
));
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
...
@@ -276,16 +303,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
...
@@ -276,16 +303,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
NULL
;
return
NULL
;
}
}
pTrans
->
id
=
trn
GenerateTransId
();
pTrans
->
id
=
mnd
GenerateTransId
();
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
policy
=
policy
;
pTrans
->
pMnode
=
pMnode
;
pTrans
->
rpcHandle
=
rpcHandle
;
pTrans
->
rpcHandle
=
rpcHandle
;
pTrans
->
redoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
@@ -298,7 +324,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
...
@@ -298,7 +324,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
pTrans
;
return
pTrans
;
}
}
static
void
trn
DropArray
(
SArray
*
pArray
)
{
static
void
mndTrans
DropArray
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
tfree
(
pRaw
);
tfree
(
pRaw
);
...
@@ -308,17 +334,17 @@ static void trnDropArray(SArray *pArray) {
...
@@ -308,17 +334,17 @@ static void trnDropArray(SArray *pArray) {
}
}
void
mndTransDrop
(
STrans
*
pTrans
)
{
void
mndTransDrop
(
STrans
*
pTrans
)
{
trn
DropArray
(
pTrans
->
redoLogs
);
mndTrans
DropArray
(
pTrans
->
redoLogs
);
trn
DropArray
(
pTrans
->
undoLogs
);
mndTrans
DropArray
(
pTrans
->
undoLogs
);
trn
DropArray
(
pTrans
->
commitLogs
);
mndTrans
DropArray
(
pTrans
->
commitLogs
);
trn
DropArray
(
pTrans
->
redoActions
);
mndTrans
DropArray
(
pTrans
->
redoActions
);
trn
DropArray
(
pTrans
->
undoActions
);
mndTrans
DropArray
(
pTrans
->
undoActions
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
tfree
(
pTrans
);
tfree
(
pTrans
);
}
}
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
pTrans
->
rpcHandle
=
rpcHandle
;
pTrans
->
rpcHandle
=
rpcHandle
;
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
}
}
...
@@ -340,19 +366,19 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
...
@@ -340,19 +366,19 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
redoLogs
,
pRaw
);
int32_t
code
=
mndTransAppendArray
(
pTrans
->
redoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
undoLogs
,
pRaw
);
int32_t
code
=
mndTransAppendArray
(
pTrans
->
undoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
commitLogs
,
pRaw
);
int32_t
code
=
mndTransAppendArray
(
pTrans
->
commitLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
...
@@ -368,7 +394,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
...
@@ -368,7 +394,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return
code
;
return
code
;
}
}
int32_t
mndTransPrepare
(
STrans
*
pTrans
)
{
int32_t
mndTransPrepare
(
S
Mnode
*
pMnode
,
S
Trans
*
pTrans
)
{
mDebug
(
"trans:%d, prepare transaction"
,
pTrans
->
id
);
mDebug
(
"trans:%d, prepare transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
...
@@ -376,180 +402,295 @@ int32_t mndTransPrepare(STrans *pTrans) {
...
@@ -376,180 +402,295 @@ int32_t mndTransPrepare(STrans *pTrans) {
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_CREATING
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
if
(
sdbWriteNotFree
(
pTrans
->
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
mError
(
"trans:%d, failed to write trans since %s"
,
pTrans
->
id
,
terrstr
());
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
STrans
*
pNewTrans
=
mndAcquireTrans
(
pMnode
,
pTrans
->
id
);
if
(
pNewTrans
==
NULL
)
{
mError
(
"trans:%d, failed to ready from sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, prepare finished"
,
pNewTrans
->
id
);
pNewTrans
->
rpcHandle
=
pTrans
->
rpcHandle
;
mndTransExecute
(
pMnode
,
pNewTrans
);
mndReleaseTrans
(
pMnode
,
pNewTrans
);
return
0
;
}
int32_t
mndTransCommit
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mDebug
(
"trans:%d, commit transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
STransMsg
*
pMsg
=
calloc
(
1
,
sizeof
(
STransMsg
));
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
pMsg
->
id
=
pTrans
->
id
;
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
pMsg
->
rpcHandle
=
pTrans
->
rpcHandle
;
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
}
mDebug
(
"trans:%d, start sync, RPC:%p pMsg:%p"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
pMsg
);
mDebug
(
"trans:%d, commit finished"
,
pTrans
->
id
);
if
(
mndSyncPropose
(
pTrans
->
pMnode
,
pRaw
,
pMsg
)
!=
0
)
{
return
0
;
}
int32_t
mndTransRollback
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mDebug
(
"trans:%d, rollback transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
free
(
pMsg
);
sdbFreeRaw
(
pRaw
);
sdbFreeRaw
(
pRaw
);
return
-
1
;
return
-
1
;
}
}
sdbFreeRaw
(
pRaw
);
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, rollback finished"
,
pTrans
->
id
);
return
0
;
return
0
;
}
}
static
void
trnSendRpcRsp
(
STransMsg
*
pMsg
,
int32_t
code
)
{
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
)
{
mDebug
(
"trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p"
,
pMsg
->
id
,
pMsg
->
rpcHandle
,
code
&
0xFFFF
,
pMsg
);
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
if
(
pMsg
->
rpcHandle
!=
NULL
)
{
mDebug
(
"trans:%d, send rpc rsp, RPC:%p code:0x%x"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
code
&
0xFFFF
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
rpcHandle
,
.
code
=
code
};
if
(
pTrans
->
rpcHandle
!=
NULL
)
{
SRpcMsg
rspMsg
=
{.
handle
=
pTrans
->
rpcHandle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcSendResponse
(
&
rspMsg
);
}
}
free
(
pMsg
);
}
}
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
)
{
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
)
{
if
(
code
==
0
)
{
// todo
mDebug
(
"trans:%d, commit transaction"
,
pMsg
->
id
);
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
code
=
terrno
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
mError
(
"trans:%d, failed to write sdb while commit since %s"
,
pMsg
->
id
,
terrstr
());
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWriteNotFree
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
return
code
;
}
}
trnSendRpcRsp
(
pMsg
,
code
);
}
}
else
{
mDebug
(
"trans:%d, rollback transaction"
,
pMsg
->
id
);
return
0
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
}
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to write sdb while rollback since %s"
,
pMsg
->
id
,
terrstr
());
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
redoLogs
)
!=
0
)
{
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute redo logs finished"
,
pTrans
->
id
)
}
}
trnSendRpcRsp
(
pMsg
,
code
);
}
}
return
code
;
}
}
static
int32_t
trnExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
int32_t
code
=
0
;
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
if
(
taosArrayGetSize
(
pTrans
->
undoLogs
)
!=
0
)
{
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
undoLogs
);
return
-
1
;
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute undo logs finished"
,
pTrans
->
id
)
}
}
}
}
return
0
;
return
code
;
}
}
static
int32_t
trnExecuteRedoLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
redoLogs
);
}
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute commit logs finished"
,
pTrans
->
id
)
}
}
static
int32_t
trnExecuteUndoLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
undoLogs
);
}
return
code
;
}
static
int32_t
trnExecuteCommitLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
commitLogs
);
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
!=
0
)
{
mTrace
(
"trans:%d, execute redo actions finished"
,
pTrans
->
id
);
}
return
0
;
}
static
int32_t
trnExecuteRedoActions
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
redoActions
);
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
!=
0
)
{
mTrace
(
"trans:%d, execute undo actions finished"
,
pTrans
->
id
);
}
return
0
;
}
static
int32_t
trnExecuteUndoActions
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
undoActions
);
}
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteRedoLogs
(
pMnode
,
pTrans
);
static
int32_t
trnPerformPrepareStage
(
STrans
*
pTrans
)
{
if
(
code
==
0
)
{
if
(
trnExecuteRedoLogs
(
pTrans
)
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
mTrace
(
"trans:%d, stage from prepare to execute"
,
pTrans
->
id
)
;
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
mError
(
"trans:%d, stage from prepare to rollback since %s"
,
pTrans
->
id
,
terrstr
())
;
}
}
return
0
;
}
}
static
int32_t
trnPerformExecuteStage
(
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformExecuteStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
trnExecuteRedoActions
(
pTrans
);
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
return
0
;
mTrace
(
"trans:%d, stage from execute to commit"
,
pTrans
->
id
)
;
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
return
-
1
;
mTrace
(
"trans:%d, stage keep on execute since %s"
,
pTrans
->
id
,
terrstr
(
code
));
return
code
;
}
else
{
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_RETRY
)
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from execute to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage from execute to retry since %s"
,
pTrans
->
id
,
terrstr
());
}
}
return
0
;
}
}
}
static
int32_t
trnPerformCommitStage
(
STrans
*
pTrans
)
{
return
0
;
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
}
}
}
static
int32_t
trnPerformRollbackStage
(
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
int32_t
code
=
mndTransExecuteCommitLogs
(
pMnode
,
pTrans
);
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_OVER
;
mTrace
(
"trans:%d, commit stage finished"
,
pTrans
->
id
);
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
return
-
1
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from commit to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage from commit to retry since %s"
,
pTrans
->
id
,
terrstr
());
}
}
}
return
code
;
}
}
static
int32_t
trnPerformRetryStage
(
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
int32_t
code
=
mndTransExecuteUndoActions
(
pMnode
,
pTrans
);
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
if
(
code
==
0
)
{
mTrace
(
"trans:%d, rollbacked"
,
pTrans
->
id
);
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
mError
(
"trans:%d, stage keep on rollback since %s"
,
pTrans
->
id
,
terrstr
())
;
}
}
}
int32_t
mndTransExecute
(
SSdb
*
pSdb
,
int32_t
tranId
)
{
int32_t
code
=
0
;
STrans
*
pTrans
=
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
tranId
);
return
code
;
if
(
pTrans
==
NULL
)
{
}
return
-
1
;
}
if
(
pTrans
->
stage
==
TRN_STAGE_PREPARE
)
{
static
int32_t
mndTransPerformRetryStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
trnPerformPrepareStage
(
pTrans
)
!=
0
)
{
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
if
(
pTrans
->
stage
==
TRN_STAGE_EXECUTE
)
{
if
(
code
==
0
)
{
if
(
trnPerformExecuteStage
(
pTrans
)
!=
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
sdbRelease
(
pSdb
,
pTrans
);
mTrace
(
"trans:%d, stage from retry to commit"
,
pTrans
->
id
);
return
-
1
;
}
else
{
}
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage keep on retry since %s"
,
pTrans
->
id
,
terrstr
());
}
}
if
(
pTrans
->
stage
==
TRN_STAGE_COMMIT
)
{
return
code
;
if
(
trnPerformCommitStage
(
pTrans
)
!=
0
)
{
}
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
if
(
pTrans
->
stage
==
TRN_STAGE_ROLLBACK
)
{
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
trnPerformRollbackStage
(
pTrans
)
!=
0
)
{
int32_t
code
=
0
;
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
if
(
pTrans
->
stage
==
TRN_STAGE_RETRY
)
{
while
(
code
==
0
)
{
if
(
trnPerformRetryStage
(
pTrans
)
!=
0
)
{
switch
(
pTrans
->
stage
)
{
sdbRelease
(
pSdb
,
pTrans
);
case
TRN_STAGE_PREPARE
:
return
-
1
;
code
=
mndTransPerformPrepareStage
(
pMnode
,
pTrans
);
break
;
case
TRN_STAGE_EXECUTE
:
code
=
mndTransPerformExecuteStage
(
pMnode
,
pTrans
);
break
;
case
TRN_STAGE_COMMIT
:
code
=
mndTransCommit
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
code
=
mndTransPerformCommitStage
(
pMnode
,
pTrans
);
}
break
;
case
TRN_STAGE_ROLLBACK
:
code
=
mndTransPerformRollbackStage
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
code
=
mndTransRollback
(
pMnode
,
pTrans
);
}
break
;
case
TRN_STAGE_RETRY
:
code
=
mndTransPerformRetryStage
(
pMnode
,
pTrans
);
break
;
default:
mndTransSendRpcRsp
(
pTrans
,
0
);
return
;
}
}
}
}
sdbRelease
(
pSdb
,
pTrans
);
mndTransSendRpcRsp
(
pTrans
,
code
);
return
0
;
}
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
8454979b
...
@@ -207,25 +207,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
...
@@ -207,25 +207,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pUndoRaw
=
mndUserActionEncode
(
&
userObj
);
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pCommitRaw
=
mndUserActionEncode
(
&
userObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -251,15 +235,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU
...
@@ -251,15 +235,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pUndoRaw
=
mndUserActionEncode
(
pOldUser
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -283,25 +259,9 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
...
@@ -283,25 +259,9 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pUndoRaw
=
mndUserActionEncode
(
pUser
);
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pCommitRaw
=
mndUserActionEncode
(
pUser
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
8454979b
...
@@ -132,11 +132,6 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
...
@@ -132,11 +132,6 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove
(
hash
,
pOldRow
->
pObj
,
keySize
);
taosHashRemove
(
hash
,
pOldRow
->
pObj
,
keySize
);
taosWUnLockLatch
(
pLock
);
taosWUnLockLatch
(
pLock
);
SdbDeleteFp
deleteFp
=
pSdb
->
deleteFps
[
pOldRow
->
type
];
if
(
deleteFp
!=
NULL
)
{
code
=
(
*
deleteFp
)(
pSdb
,
pOldRow
->
pObj
);
}
sdbRelease
(
pSdb
,
pOldRow
->
pObj
);
sdbRelease
(
pSdb
,
pOldRow
->
pObj
);
sdbFreeRow
(
pRow
);
sdbFreeRow
(
pRow
);
return
code
;
return
code
;
...
@@ -161,6 +156,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
...
@@ -161,6 +156,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
case
SDB_STATUS_CREATING
:
case
SDB_STATUS_CREATING
:
code
=
sdbInsertRow
(
pSdb
,
hash
,
pRaw
,
pRow
,
keySize
);
code
=
sdbInsertRow
(
pSdb
,
hash
,
pRaw
,
pRow
,
keySize
);
break
;
break
;
case
SDB_STATUS_UPDATING
:
case
SDB_STATUS_READY
:
case
SDB_STATUS_READY
:
case
SDB_STATUS_DROPPING
:
case
SDB_STATUS_DROPPING
:
code
=
sdbUpdateRow
(
pSdb
,
hash
,
pRaw
,
pRow
,
keySize
);
code
=
sdbUpdateRow
(
pSdb
,
hash
,
pRaw
,
pRow
,
keySize
);
...
@@ -228,6 +224,11 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
...
@@ -228,6 +224,11 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
if
(
ref
<=
0
&&
pRow
->
status
==
SDB_STATUS_DROPPED
)
{
if
(
ref
<=
0
&&
pRow
->
status
==
SDB_STATUS_DROPPED
)
{
SdbDeleteFp
deleteFp
=
pSdb
->
deleteFps
[
pRow
->
type
];
if
(
deleteFp
!=
NULL
)
{
(
*
deleteFp
)(
pSdb
,
pRow
->
pObj
);
}
sdbFreeRow
(
pRow
);
sdbFreeRow
(
pRow
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录