Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2348b6e2
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2348b6e2
编写于
12月 14, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 refact trans
上级
43f600e0
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
331 addition
and
255 deletion
+331
-255
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+6
-53
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/inc/mndSync.h
source/dnode/mnode/impl/inc/mndSync.h
+1
-1
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-3
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+2
-2
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+2
-2
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+7
-7
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+299
-176
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+3
-3
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
2348b6e2
...
...
@@ -144,9 +144,10 @@ typedef struct SSdbRow SSdbRow;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_STATUS_CREATING
=
1
,
SDB_STATUS_
READY
=
2
,
SDB_STATUS_
UPDATING
=
2
,
SDB_STATUS_DROPPING
=
3
,
SDB_STATUS_DROPPED
=
4
SDB_STATUS_READY
=
4
,
SDB_STATUS_DROPPED
=
5
}
ESdbStatus
;
typedef
enum
{
...
...
@@ -174,67 +175,19 @@ typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
struct
{
/**
* @brief The sdb type of the table.
*
*/
ESdbType
sdbType
;
/**
* @brief The key type of the table.
*
*/
EKeyType
keyType
;
/**
* @brief The callback function when the table is first deployed.
*
*/
ESdbType
sdbType
;
EKeyType
keyType
;
SdbDeployFp
deployFp
;
/**
* @brief Encode one row of the table into rawdata.
*
*/
SdbEncodeFp
encodeFp
;
/**
* @brief Decode one row of the table from rawdata.
*
*/
SdbDecodeFp
decodeFp
;
/**
* @brief The callback function when insert a row to sdb.
*
*/
SdbInsertFp
insertFp
;
/**
* @brief The callback function when undate a row in sdb.
*
*/
SdbUpdateFp
updateFp
;
/**
* @brief The callback function when delete a row from sdb.
*
*/
SdbDeleteFp
deleteFp
;
}
SSdbTable
;
typedef
struct
SSdbOpt
{
/**
* @brief The path of the sdb file.
*
*/
const
char
*
path
;
/**
* @brief The mnode object.
*
*/
SMnode
*
pMnode
;
SMnode
*
pMnode
;
}
SSdbOpt
;
/**
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2348b6e2
...
...
@@ -103,7 +103,6 @@ typedef struct STrans {
int32_t
id
;
ETrnStage
stage
;
ETrnPolicy
policy
;
SMnode
*
pMnode
;
void
*
rpcHandle
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
...
...
@@ -306,6 +305,7 @@ typedef struct SMnodeMsg {
typedef
struct
{
int32_t
id
;
int32_t
code
;
void
*
rpcHandle
;
}
STransMsg
;
...
...
source/dnode/mnode/impl/inc/mndSync.h
浏览文件 @
2348b6e2
...
...
@@ -25,7 +25,7 @@ extern "C" {
int32_t
mndInitSync
(
SMnode
*
pMnode
);
void
mndCleanupSync
(
SMnode
*
pMnode
);
bool
mndIsMaster
(
SMnode
*
pMnode
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
2348b6e2
...
...
@@ -32,10 +32,10 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransPrepare
(
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
int32_t
mndTransExecute
(
SSdb
*
pSdb
,
int32_t
tranId
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
2348b6e2
...
...
@@ -357,7 +357,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -491,7 +491,7 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -571,7 +571,7 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
2348b6e2
...
...
@@ -431,7 +431,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -503,7 +503,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
2348b6e2
...
...
@@ -183,7 +183,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -226,7 +226,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
2348b6e2
...
...
@@ -238,7 +238,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -313,7 +313,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
2348b6e2
...
...
@@ -285,7 +285,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -433,7 +433,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
2348b6e2
...
...
@@ -21,16 +21,16 @@
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{}
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
)
{
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
0
;
int32_t
len
=
sdbGetRawTotalSize
(
pRaw
);
SSdbRaw
*
pReceived
=
calloc
(
1
,
len
);
memcpy
(
pReceived
,
pRaw
,
len
);
mDebug
(
"trans:%d, data:%p recv from sync, code:0x%x pMsg:%p"
,
pMsg
->
id
,
pReceived
,
code
&
0xFFFF
,
pMsg
);
//
int32_t len = sdbGetRawTotalSize(pRaw);
//
SSdbRaw *pReceived = calloc(1, len);
//
memcpy(pReceived, pRaw, len);
//
mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
mndTransApply
(
pMnode
,
pReceived
,
pMsg
,
code
);
return
0
;
// mndTransApply(pMnode, pReceived
, code);
return
code
;
}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
return
true
;
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
2348b6e2
...
...
@@ -17,8 +17,9 @@
#include "mndTrans.h"
#include "mndSync.h"
#define SDB_TRANS_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
#define TSDB_TRANS_VER 1
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -26,6 +27,22 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOldTrans
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
int32_t
mndTransAppendArray
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
void
mndTransDropArray
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformExecuteStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndInitTrans
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_TRANS
,
.
keyType
=
SDB_KEY_INT32
,
...
...
@@ -63,7 +80,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
SDB_TRANS_VER
,
rawDataLen
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
T
SDB_TRANS_VER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
return
NULL
;
...
...
@@ -100,6 +117,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
return
pRaw
;
}
...
...
@@ -113,7 +132,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return
NULL
;
}
if
(
sver
!=
SDB_TRANS_VER
)
{
if
(
sver
!=
T
SDB_TRANS_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to get check soft ver from raw:%p since %s"
,
pRaw
,
terrstr
());
return
NULL
;
...
...
@@ -126,11 +145,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return
NULL
;
}
pTrans
->
redoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -197,6 +216,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
TRANS_DECODE_OVER:
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to parse from raw:%p since %s"
,
pTrans
->
id
,
pRaw
,
tstrerror
(
errno
));
...
...
@@ -210,64 +231,71 @@ TRANS_DECODE_OVER:
}
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform insert action, stage:%d"
,
pTrans
->
id
,
pTrans
->
stage
);
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
mTrace
(
"trans:%d, perform insert action, stage:%s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
));
return
0
;
}
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform delete action, stage:%
d"
,
pTrans
->
id
,
pTrans
->
stage
);
mTrace
(
"trans:%d, perform delete action, stage:%
s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
)
);
SArray
*
pArray
=
pTrans
->
undoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
mndTransDropArray
(
pTrans
->
redoLogs
);
mndTransDropArray
(
pTrans
->
undoLogs
);
mndTransDropArray
(
pTrans
->
commitLogs
);
mndTransDropArray
(
pTrans
->
redoActions
);
mndTransDropArray
(
pTrans
->
undoActions
);
return
0
;
}
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
pOldTrans
,
STrans
*
pNewTrans
)
{
mTrace
(
"trans:%d, perform update action, stage:%d"
,
pOldTrans
->
id
,
pNewTrans
->
stage
);
SArray
*
pArray
=
pOldTrans
->
commitLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWrite
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write raw:%p to sdb since %s"
,
pOldTrans
->
id
,
pRaw
,
terrstr
());
return
code
;
}
}
mTrace
(
"trans:%d, perform update action, stage:%s"
,
pOldTrans
->
id
,
mndTransStageStr
(
pNewTrans
->
stage
));
pOldTrans
->
stage
=
pNewTrans
->
stage
;
return
0
;
}
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
transId
);
}
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pTrans
);
}
static
int32_t
trnGenerateTransId
()
{
static
int32_t
tmp
=
0
;
return
++
tmp
;
}
char
*
mndTransStageStr
(
ETrnStage
stage
)
{
switch
(
stage
)
{
case
TRN_STAGE_PREPARE
:
return
"prepare"
;
case
TRN_STAGE_EXECUTE
:
return
"execute"
;
case
TRN_STAGE_COMMIT
:
return
"commit"
;
case
TRN_STAGE_ROLLBACK
:
return
"rollback"
;
case
TRN_STAGE_RETRY
:
return
"retry"
;
default:
return
"undefined"
;
}
}
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
)
{
switch
(
policy
)
{
case
TRN_POLICY_ROLLBACK
:
return
"prepare"
;
case
TRN_POLICY_RETRY
:
return
"retry"
;
default:
return
"undefined"
;
}
}
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
void
*
rpcHandle
)
{
STrans
*
pTrans
=
calloc
(
1
,
sizeof
(
STrans
));
if
(
pTrans
==
NULL
)
{
...
...
@@ -279,13 +307,12 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans
->
id
=
trnGenerateTransId
();
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
pMnode
=
pMnode
;
pTrans
->
rpcHandle
=
rpcHandle
;
pTrans
->
redoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
RN_DEFAULT
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
T
SDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -298,7 +325,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
pTrans
;
}
static
void
trn
DropArray
(
SArray
*
pArray
)
{
static
void
mndTrans
DropArray
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
tfree
(
pRaw
);
...
...
@@ -308,17 +335,17 @@ static void trnDropArray(SArray *pArray) {
}
void
mndTransDrop
(
STrans
*
pTrans
)
{
trn
DropArray
(
pTrans
->
redoLogs
);
trn
DropArray
(
pTrans
->
undoLogs
);
trn
DropArray
(
pTrans
->
commitLogs
);
trn
DropArray
(
pTrans
->
redoActions
);
trn
DropArray
(
pTrans
->
undoActions
);
mndTrans
DropArray
(
pTrans
->
redoLogs
);
mndTrans
DropArray
(
pTrans
->
undoLogs
);
mndTrans
DropArray
(
pTrans
->
commitLogs
);
mndTrans
DropArray
(
pTrans
->
redoActions
);
mndTrans
DropArray
(
pTrans
->
undoActions
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
tfree
(
pTrans
);
}
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
pTrans
->
rpcHandle
=
rpcHandle
;
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
}
...
...
@@ -340,19 +367,19 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
redoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
undoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
commitLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:
%d
"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:
0x%x
"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
...
...
@@ -368,7 +395,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return
code
;
}
int32_t
mndTransPrepare
(
STrans
*
pTrans
)
{
int32_t
mndTransPrepare
(
S
Mnode
*
pMnode
,
S
Trans
*
pTrans
)
{
mDebug
(
"trans:%d, prepare transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
...
...
@@ -376,180 +403,276 @@ int32_t mndTransPrepare(STrans *pTrans) {
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_
CREATING
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_
READY
);
if
(
sdbWriteNotFree
(
pTrans
->
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to write trans since %s"
,
pTrans
->
id
,
terrstr
());
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
STransMsg
*
pMsg
=
calloc
(
1
,
sizeof
(
STransMsg
));
pMsg
->
id
=
pTrans
->
id
;
pMsg
->
rpcHandle
=
pTrans
->
rpcHandle
;
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
mDebug
(
"trans:%d, start sync, RPC:%p pMsg:%p"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
pMsg
);
if
(
mndSyncPropose
(
pTrans
->
pMnode
,
pRaw
,
pMsg
)
!=
0
)
{
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
STrans
*
pNewTrans
=
mndAcquireTrans
(
pMnode
,
pTrans
->
id
);
if
(
pNewTrans
==
NULL
)
{
mError
(
"trans:%d, failed to ready from sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, prepare finished"
,
pNewTrans
->
id
);
mndTransExecute
(
pMnode
,
pNewTrans
);
mndReleaseTrans
(
pMnode
,
pNewTrans
);
return
0
;
}
int32_t
mndTransCommit
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mDebug
(
"trans:%d, commit transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
free
(
pMsg
);
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
sdbFreeRaw
(
pRaw
);
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, commit finished"
,
pTrans
->
id
);
return
0
;
}
static
void
trnSendRpcRsp
(
STransMsg
*
pMsg
,
int32_t
code
)
{
mDebug
(
"trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p"
,
pMsg
->
id
,
pMsg
->
rpcHandle
,
code
&
0xFFFF
,
pMsg
);
if
(
pMsg
->
rpcHandle
!=
NULL
)
{
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
rpcHandle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
int32_t
mndTransRollback
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mDebug
(
"trans:%d, rollback transaction"
,
pTrans
->
id
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
mTrace
(
"trans:%d, start sync"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
free
(
pMsg
);
mDebug
(
"trans:%d, rollback finished"
,
pTrans
->
id
);
return
0
;
}
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
)
{
if
(
code
==
0
)
{
mDebug
(
"trans:%d, commit transaction"
,
pMsg
->
id
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
code
=
terrno
;
mError
(
"trans:%d, failed to write sdb while commit since %s"
,
pMsg
->
id
,
terrstr
());
}
trnSendRpcRsp
(
pMsg
,
code
);
}
else
{
mDebug
(
"trans:%d, rollback transaction"
,
pMsg
->
id
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to write sdb while rollback since %s"
,
pMsg
->
id
,
terrstr
());
}
trnSendRpcRsp
(
pMsg
,
code
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
)
{
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
mDebug
(
"trans:%d, send rpc rsp, RPC:%p code:0x%x"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
code
&
0xFFFF
);
if
(
pTrans
->
rpcHandle
!=
NULL
)
{
SRpcMsg
rspMsg
=
{.
handle
=
pTrans
->
rpcHandle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
}
}
static
int32_t
trnExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
)
{
// todo
}
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
return
-
1
;
int32_t
code
=
sdbWriteNotFree
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
return
code
;
}
}
return
0
;
}
static
int32_t
trnExecuteRedoLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
redoLogs
);
}
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute redo logs finished"
,
pTrans
->
id
)
}
return
code
;
}
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
undoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute undo logs finished"
,
pTrans
->
id
)
}
return
code
;
}
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteArray
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mTrace
(
"trans:%d, execute commit logs finished"
,
pTrans
->
id
)
}
static
int32_t
trnExecuteUndoLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
undoLogs
);
}
return
code
;
}
static
int32_t
trnExecuteCommitLogs
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
commitLogs
);
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, execute redo actions finished"
,
pTrans
->
id
);
return
0
;
}
static
int32_t
trnExecuteRedoActions
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
redoActions
);
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, execute undo actions finished"
,
pTrans
->
id
);
return
0
;
}
static
int32_t
trnExecuteUndoActions
(
STrans
*
pTrans
)
{
return
trnExecuteArray
(
pTrans
->
pMnode
,
pTrans
->
undoActions
);
}
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteRedoLogs
(
pMnode
,
pTrans
);
static
int32_t
trnPerformPrepareStage
(
STrans
*
pTrans
)
{
if
(
trnExecuteRedoLogs
(
pTrans
)
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
mTrace
(
"trans:%d, stage from prepare to execute"
,
pTrans
->
id
)
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
mError
(
"trans:%d, stage from prepare to rollback since %s"
,
pTrans
->
id
,
terrstr
())
;
}
return
0
;
}
static
int32_t
trnPerformExecuteStage
(
STrans
*
pTrans
)
{
int32_t
code
=
trnExecuteRedoActions
(
pTrans
);
static
int32_t
mndTransPerformExecuteStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
return
0
;
mTrace
(
"trans:%d, stage from execute to commit"
,
pTrans
->
id
)
;
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
return
-
1
;
mTrace
(
"trans:%d, stage keep on execute since %s"
,
pTrans
->
id
,
terrstr
(
code
));
return
code
;
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_RETRY
)
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from execute to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage from execute to retry since %s"
,
pTrans
->
id
,
terrstr
());
}
return
0
;
}
}
static
int32_t
trnPerformCommitStage
(
STrans
*
pTrans
)
{
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
}
return
0
;
}
static
int32_t
trnPerformRollbackStage
(
STrans
*
pTrans
)
{
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteCommitLogs
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
mTrace
(
"trans:%d, commit stage finished"
,
pTrans
->
id
);
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from commit to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage from commit to retry since %s"
,
pTrans
->
id
,
terrstr
());
}
}
return
code
;
}
static
int32_t
trnPerformRetryStage
(
STrans
*
pTrans
)
{
if
(
trnExecuteCommitLogs
(
pTrans
)
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
return
0
;
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteUndoActions
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
mTrace
(
"trans:%d, rollbacked"
,
pTrans
->
id
);
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
return
-
1
;
mError
(
"trans:%d, stage keep on rollback since %s"
,
pTrans
->
id
,
terrstr
())
;
}
}
int32_t
mndTransExecute
(
SSdb
*
pSdb
,
int32_t
tranId
)
{
int32_t
code
=
0
;
STrans
*
pTrans
=
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
tranId
);
if
(
pTrans
==
NULL
)
{
return
-
1
;
}
return
code
;
}
if
(
pTrans
->
stage
==
TRN_STAGE_PREPARE
)
{
if
(
trnPerformPrepareStage
(
pTrans
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
static
int32_t
mndTransPerformRetryStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
if
(
pTrans
->
stage
==
TRN_STAGE_EXECUTE
)
{
if
(
trnPerformExecuteStage
(
pTrans
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
mTrace
(
"trans:%d, stage from retry to commit"
,
pTrans
->
id
);
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage keep on retry since %s"
,
pTrans
->
id
,
terrstr
());
}
if
(
pTrans
->
stage
==
TRN_STAGE_COMMIT
)
{
if
(
trnPerformCommitStage
(
pTrans
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
return
code
;
}
if
(
pTrans
->
stage
==
TRN_STAGE_ROLLBACK
)
{
if
(
trnPerformRollbackStage
(
pTrans
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
}
}
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
pTrans
->
stage
==
TRN_STAGE_RETRY
)
{
if
(
trnPerformRetryStage
(
pTrans
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTrans
);
return
-
1
;
while
(
code
==
0
)
{
switch
(
pTrans
->
stage
)
{
case
TRN_STAGE_PREPARE
:
code
=
mndTransPerformPrepareStage
(
pMnode
,
pTrans
);
break
;
case
TRN_STAGE_EXECUTE
:
code
=
mndTransPerformExecuteStage
(
pMnode
,
pTrans
);
break
;
case
TRN_STAGE_COMMIT
:
code
=
mndTransCommit
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
code
=
mndTransPerformCommitStage
(
pMnode
,
pTrans
);
}
break
;
case
TRN_STAGE_ROLLBACK
:
code
=
mndTransPerformRollbackStage
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
code
=
mndTransRollback
(
pMnode
,
pTrans
);
}
break
;
case
TRN_STAGE_RETRY
:
code
=
mndTransPerformRetryStage
(
pMnode
,
pTrans
);
break
;
}
}
sdbRelease
(
pSdb
,
pTrans
);
return
0
;
}
\ No newline at end of file
mndTransSendRpcRsp
(
pTrans
,
code
);
}
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
2348b6e2
...
...
@@ -235,7 +235,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -269,7 +269,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -311,7 +311,7 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
p
Mnode
,
p
Trans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录