Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
70e6e159
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
70e6e159
编写于
6月 01, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'fix/mnode' into fix/tsim
上级
8baf32fc
1035312f
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
170 addition
and
329 deletion
+170
-329
include/util/tdef.h
include/util/tdef.h
+1
-2
source/common/src/systable.c
source/common/src/systable.c
+0
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+14
-65
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-3
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+1
-1
source/dnode/mnode/impl/src/mndBnode.c
source/dnode/mnode/impl/src/mndBnode.c
+2
-2
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+2
-5
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+3
-3
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+3
-7
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+2
-2
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+5
-9
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+1
-1
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+2
-2
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+3
-3
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-3
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+6
-7
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+2
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+95
-172
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+4
-8
source/dnode/mnode/impl/test/trans/trans2.cpp
source/dnode/mnode/impl/test/trans/trans2.cpp
+9
-5
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+0
-1
tests/script/tsim/trans/create_db.sim
tests/script/tsim/trans/create_db.sim
+1
-17
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+2
-2
未找到文件。
include/util/tdef.h
浏览文件 @
70e6e159
...
...
@@ -253,8 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
...
...
source/common/src/systable.c
浏览文件 @
70e6e159
...
...
@@ -215,7 +215,6 @@ static const SSysDbTableSchema transSchema[] = {
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"stage"
,
.
bytes
=
TSDB_TRANS_STAGE_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"db"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"type"
,
.
bytes
=
TSDB_TRANS_TYPE_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"failed_times"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"last_exec_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"last_error"
,
.
bytes
=
(
TSDB_TRANS_ERROR_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
70e6e159
...
...
@@ -54,9 +54,11 @@ typedef enum {
}
EAuthOp
;
typedef
enum
{
TRN_STEP_LOG
=
1
,
TRN_STEP_ACTION
=
2
,
}
ETrnStep
;
TRN_CONFLICT_NOTHING
=
0
,
TRN_CONFLICT_GLOBAL
=
1
,
TRN_CONFLICT_DB
=
2
,
TRN_CONFLICT_DB_INSIDE
=
3
,
}
ETrnConflct
;
typedef
enum
{
TRN_STAGE_PREPARE
=
0
,
...
...
@@ -68,69 +70,15 @@ typedef enum {
TRN_STAGE_FINISHED
=
6
}
ETrnStage
;
typedef
enum
{
TRN_TYPE_BASIC_SCOPE
=
1000
,
TRN_TYPE_CREATE_ACCT
=
1001
,
TRN_TYPE_CREATE_CLUSTER
=
1002
,
TRN_TYPE_CREATE_USER
=
1003
,
TRN_TYPE_ALTER_USER
=
1004
,
TRN_TYPE_DROP_USER
=
1005
,
TRN_TYPE_CREATE_FUNC
=
1006
,
TRN_TYPE_DROP_FUNC
=
1007
,
TRN_TYPE_CREATE_SNODE
=
1010
,
TRN_TYPE_DROP_SNODE
=
1011
,
TRN_TYPE_CREATE_QNODE
=
1012
,
TRN_TYPE_DROP_QNODE
=
10013
,
TRN_TYPE_CREATE_BNODE
=
1014
,
TRN_TYPE_DROP_BNODE
=
1015
,
TRN_TYPE_CREATE_MNODE
=
1016
,
TRN_TYPE_DROP_MNODE
=
1017
,
TRN_TYPE_CREATE_TOPIC
=
1020
,
TRN_TYPE_DROP_TOPIC
=
1021
,
TRN_TYPE_SUBSCRIBE
=
1022
,
TRN_TYPE_REBALANCE
=
1023
,
TRN_TYPE_COMMIT_OFFSET
=
1024
,
TRN_TYPE_CREATE_STREAM
=
1025
,
TRN_TYPE_DROP_STREAM
=
1026
,
TRN_TYPE_ALTER_STREAM
=
1027
,
TRN_TYPE_CONSUMER_LOST
=
1028
,
TRN_TYPE_CONSUMER_RECOVER
=
1029
,
TRN_TYPE_DROP_CGROUP
=
1030
,
TRN_TYPE_BASIC_SCOPE_END
,
TRN_TYPE_GLOBAL_SCOPE
=
2000
,
TRN_TYPE_CREATE_DNODE
=
2001
,
TRN_TYPE_DROP_DNODE
=
2002
,
TRN_TYPE_GLOBAL_SCOPE_END
,
TRN_TYPE_DB_SCOPE
=
3000
,
TRN_TYPE_CREATE_DB
=
3001
,
TRN_TYPE_ALTER_DB
=
3002
,
TRN_TYPE_DROP_DB
=
3003
,
TRN_TYPE_SPLIT_VGROUP
=
3004
,
TRN_TYPE_MERGE_VGROUP
=
3015
,
TRN_TYPE_DB_SCOPE_END
,
TRN_TYPE_STB_SCOPE
=
4000
,
TRN_TYPE_CREATE_STB
=
4001
,
TRN_TYPE_ALTER_STB
=
4002
,
TRN_TYPE_DROP_STB
=
4003
,
TRN_TYPE_CREATE_SMA
=
4004
,
TRN_TYPE_DROP_SMA
=
4005
,
TRN_TYPE_STB_SCOPE_END
,
}
ETrnType
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
0
,
TRN_POLICY_RETRY
=
1
,
}
ETrnPolicy
;
typedef
enum
{
TRN_EXEC_P
ARA
LLEL
=
0
,
TRN_EXEC_
NO_PARALLE
L
=
1
,
}
ETrnExec
Type
;
TRN_EXEC_P
RAR
LLEL
=
0
,
TRN_EXEC_
SERIA
L
=
1
,
}
ETrnExec
;
typedef
enum
{
DND_REASON_ONLINE
=
0
,
...
...
@@ -159,8 +107,8 @@ typedef struct {
int32_t
id
;
ETrnStage
stage
;
ETrnPolicy
policy
;
ETrn
Type
type
;
ETrnExec
Type
parallel
;
ETrn
Conflct
conflict
;
ETrnExec
exec
;
int32_t
code
;
int32_t
failedTimes
;
SRpcHandleInfo
rpcInfo
;
...
...
@@ -172,10 +120,11 @@ typedef struct {
SArray
*
commitActions
;
int64_t
createdTime
;
int64_t
lastExecTime
;
int64_t
dbUid
;
int32_t
lastErrorAction
;
int32_t
lastErrorNo
;
tmsg_t
lastErrorMsgType
;
SEpSet
lastErrorEpset
;
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
lastError
[
TSDB_TRANS_ERROR_LEN
];
char
desc
[
TSDB_TRANS_DESC_LEN
];
int32_t
startFunc
;
int32_t
stopFunc
;
int32_t
paramLen
;
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
70e6e159
...
...
@@ -34,7 +34,7 @@ typedef struct {
int32_t
errCode
;
int32_t
acceptableCode
;
int8_t
stage
;
int8_t
isRaw
;
int8_t
actionType
;
// 0-msg, 1-raw
int8_t
rawWritten
;
int8_t
msgSent
;
int8_t
msgReceived
;
...
...
@@ -52,7 +52,7 @@ void mndCleanupTrans(SMnode *pMnode);
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
);
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Type
type
,
const
SRpcMsg
*
pReq
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Conflct
conflict
,
const
SRpcMsg
*
pReq
);
void
mndTransDrop
(
STrans
*
pTrans
);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
...
...
@@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void
mndTransSetRpcRsp
(
STrans
*
pTrans
,
void
*
pCont
,
int32_t
contLen
);
void
mndTransSetCb
(
STrans
*
pTrans
,
ETrnFunc
startFunc
,
ETrnFunc
stopFunc
,
void
*
param
,
int32_t
paramLen
);
void
mndTransSetDbInfo
(
STrans
*
pTrans
,
SDbObj
*
pDb
);
void
mndTransSet
NoParalle
l
(
STrans
*
pTrans
);
void
mndTransSet
Seria
l
(
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransProcessRsp
(
SRpcMsg
*
pRsp
);
...
...
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
70e6e159
...
...
@@ -80,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mDebug
(
"acct:%s, will be created when deploying, raw:%p"
,
acctObj
.
acct
,
pRaw
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_ACCT
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"acct:%s, failed to create since %s"
,
acctObj
.
acct
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndBnode.c
浏览文件 @
70e6e159
...
...
@@ -246,7 +246,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
bnodeObj
.
createdTime
=
taosGetTimestampMs
();
bnodeObj
.
updateTime
=
bnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_BNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create bnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static
int32_t
mndDropBnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SBnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_BNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop bnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
70e6e159
...
...
@@ -179,10 +179,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"cluster:%"
PRId64
", will be created when deploying, raw:%p"
,
clusterObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_CLUSTER
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"cluster:%"
PRId64
", failed to create since %s"
,
clusterObj
.
id
,
terrstr
());
return
-
1
;
...
...
@@ -204,7 +202,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndRetrieveClusters
(
SRpcMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
70e6e159
...
...
@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CONSUMER_LOST
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
...
@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CONSUMER_RECOVER
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
...
@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
// check topic existance
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_SUBSCRIBE
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
for
(
int32_t
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
70e6e159
...
...
@@ -545,7 +545,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create db:%s"
,
pTrans
->
id
,
pCreate
->
db
);
...
...
@@ -775,7 +775,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndAlterDb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_ALTER
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to alter db:%s"
,
pTrans
->
id
,
pOld
->
name
);
...
...
@@ -1036,7 +1036,7 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
70e6e159
...
...
@@ -101,10 +101,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mDebug
(
"dnode:%d, will be created when deploying, raw:%p"
,
dnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
dnodeObj
.
ep
,
terrstr
());
return
-
1
;
...
...
@@ -126,7 +123,6 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndDnodeActionEncode
(
SDnodeObj
*
pDnode
)
{
...
...
@@ -488,7 +484,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
memcpy
(
dnodeObj
.
fqdn
,
pCreate
->
fqdn
,
TSDB_FQDN_LEN
);
snprintf
(
dnodeObj
.
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
dnodeObj
.
fqdn
,
dnodeObj
.
port
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_DNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
dnodeObj
.
ep
,
terrstr
());
return
-
1
;
...
...
@@ -564,7 +560,7 @@ CREATE_DNODE_OVER:
}
static
int32_t
mndDropDnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDnodeObj
*
pDnode
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_DNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%d, failed to drop since %s"
,
pDnode
->
id
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
70e6e159
...
...
@@ -215,7 +215,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
}
memcpy
(
func
.
pCode
,
pCreate
->
pCode
,
func
.
codeSize
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_FUNC
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create func:%s"
,
pTrans
->
id
,
pCreate
->
name
);
...
...
@@ -245,7 +245,7 @@ _OVER:
static
int32_t
mndDropFunc
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SFuncObj
*
pFunc
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_FUNC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop user:%s"
,
pTrans
->
id
,
pFunc
->
name
);
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
70e6e159
...
...
@@ -92,10 +92,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mDebug
(
"mnode:%d, will be created when deploying, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to create since %s"
,
mnodeObj
.
id
,
terrstr
());
return
-
1
;
...
...
@@ -117,7 +114,6 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pObj
)
{
...
...
@@ -363,11 +359,11 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_MNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoActions
(
pMnode
,
pTrans
,
pDnode
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
...
...
@@ -540,11 +536,11 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static
int32_t
mndDropMnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SMnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_MNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetDropMnodeRedoLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeCommitLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeRedoActions
(
pMnode
,
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
70e6e159
...
...
@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq
(
&
decoder
,
&
commitOffsetReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_COMMIT_OFFSET
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
for
(
int32_t
i
=
0
;
i
<
commitOffsetReq
.
num
;
i
++
)
{
SMqOffset
*
pOffset
=
&
commitOffsetReq
.
offsets
[
i
];
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
70e6e159
...
...
@@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
qnodeObj
.
createdTime
=
taosGetTimestampMs
();
qnodeObj
.
updateTime
=
qnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_QNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create qnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -365,7 +365,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
static
int32_t
mndDropQnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SQnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_QNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop qnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
70e6e159
...
...
@@ -502,12 +502,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj
.
fixedSinkVgId
=
smaObj
.
dstVgId
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_SMA
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create sma:%s"
,
pTrans
->
id
,
pCreate
->
name
);
mndTransSetDbInfo
(
pTrans
,
pDb
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
...
...
@@ -747,7 +747,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
pVgroup
=
mndAcquireVgroup
(
pMnode
,
pSma
->
dstVgId
);
if
(
pVgroup
==
NULL
)
goto
_OVER
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_SMA
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
...
...
source/dnode/mnode/impl/src/mndSnode.c
浏览文件 @
70e6e159
...
...
@@ -253,7 +253,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
snodeObj
.
createdTime
=
taosGetTimestampMs
();
snodeObj
.
updateTime
=
snodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_SNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create snode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -372,7 +372,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static
int32_t
mndDropSnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SSnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_SNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop snode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
70e6e159
...
...
@@ -735,7 +735,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_STB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create stb:%s"
,
pTrans
->
id
,
pCreate
->
name
);
...
...
@@ -1257,7 +1257,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if
(
code
!=
0
)
goto
_OVER
;
code
=
-
1
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_ALTER_STB
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pAlter
->
name
);
...
...
@@ -1403,7 +1403,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndDropStb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_STB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
70e6e159
...
...
@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
tstrncpy
(
streamObj
.
targetDb
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_STREAM
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"stream:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
70e6e159
...
...
@@ -394,8 +394,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo
(
"rebalance calculation completed, rebalanced vg:"
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
mInfo
(
"vg
: %d moved from consumer %ld to consumer %ld"
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumer
Id
,
pOutputRebVg
->
newConsumerId
);
mInfo
(
"vg
Id:%d moved from consumer %"
PRId64
" to consumer %"
PRId64
,
pOutputRebVg
->
pVgEp
->
vg
Id
,
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
}
// 9. clear
...
...
@@ -405,10 +405,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static
int32_t
mndPersistRebResult
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
const
SMqRebOutputObj
*
pOutput
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_REBALANCE
,
pMsg
);
if
(
pTrans
==
NULL
)
{
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
return
-
1
;
// make txn:
// 1. redo action: action to all vg
const
SArray
*
rebVgs
=
pOutput
->
rebVgs
;
...
...
@@ -625,7 +624,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_CGROUP
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mndReleaseSubscribe
(
pMnode
,
pSub
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
70e6e159
...
...
@@ -383,7 +383,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
/*topicObj.withSchema = 1;*/
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_TOPIC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFreeClear
(
topicObj
.
ast
);
...
...
@@ -551,7 +551,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
#endif
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_TOPIC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
70e6e159
...
...
@@ -88,12 +88,12 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for
(
int32_t
i
=
0
;
i
<
actionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
rawDataLen
+=
(
sdbGetRawTotalSize
(
pAction
->
pRaw
)
+
sizeof
(
int32_t
));
}
else
{
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
rawDataLen
+=
sizeof
(
pAction
->
isRaw
);
rawDataLen
+=
sizeof
(
pAction
->
actionType
);
}
return
rawDataLen
;
...
...
@@ -117,8 +117,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
id
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
stage
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
policy
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
type
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
parallel
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
conflict
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pTrans
->
exec
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTrans
->
createdTime
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTrans
->
dbname
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
redoActionPos
,
_OVER
)
...
...
@@ -135,9 +135,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
actionType
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
...
...
@@ -157,9 +157,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
actionType
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
...
...
@@ -179,9 +179,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
actionType
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
...
...
@@ -250,16 +250,16 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int16_t
stage
=
0
;
int16_t
policy
=
0
;
int16_t
type
=
0
;
int16_t
parallel
=
0
;
int16_t
conflict
=
0
;
int16_t
exec
=
0
;
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
stage
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
policy
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
type
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
parallel
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
conflict
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
exec
,
_OVER
)
pTrans
->
stage
=
stage
;
pTrans
->
policy
=
policy
;
pTrans
->
type
=
type
;
pTrans
->
parallel
=
parallel
;
pTrans
->
conflict
=
conflict
;
pTrans
->
exec
=
exec
;
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTrans
->
createdTime
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTrans
->
dbname
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTrans
->
redoActionPos
,
_OVER
)
...
...
@@ -279,9 +279,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
actionType
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
if
(
action
.
actionType
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
...
...
@@ -308,9 +308,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
actionType
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
if
(
action
.
actionType
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
...
...
@@ -337,9 +337,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
actionType
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
if
(
action
.
actionType
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
...
...
@@ -408,81 +408,6 @@ static const char *mndTransStr(ETrnStage stage) {
}
}
static
const
char
*
mndTransType
(
ETrnType
type
)
{
switch
(
type
)
{
case
TRN_TYPE_CREATE_USER
:
return
"create-user"
;
case
TRN_TYPE_ALTER_USER
:
return
"alter-user"
;
case
TRN_TYPE_DROP_USER
:
return
"drop-user"
;
case
TRN_TYPE_CREATE_FUNC
:
return
"create-func"
;
case
TRN_TYPE_DROP_FUNC
:
return
"drop-func"
;
case
TRN_TYPE_CREATE_SNODE
:
return
"create-snode"
;
case
TRN_TYPE_DROP_SNODE
:
return
"drop-snode"
;
case
TRN_TYPE_CREATE_QNODE
:
return
"create-qnode"
;
case
TRN_TYPE_DROP_QNODE
:
return
"drop-qnode"
;
case
TRN_TYPE_CREATE_BNODE
:
return
"create-bnode"
;
case
TRN_TYPE_DROP_BNODE
:
return
"drop-bnode"
;
case
TRN_TYPE_CREATE_MNODE
:
return
"create-mnode"
;
case
TRN_TYPE_DROP_MNODE
:
return
"drop-mnode"
;
case
TRN_TYPE_CREATE_TOPIC
:
return
"create-topic"
;
case
TRN_TYPE_DROP_TOPIC
:
return
"drop-topic"
;
case
TRN_TYPE_SUBSCRIBE
:
return
"subscribe"
;
case
TRN_TYPE_REBALANCE
:
return
"rebalance"
;
case
TRN_TYPE_COMMIT_OFFSET
:
return
"commit-offset"
;
case
TRN_TYPE_CREATE_STREAM
:
return
"create-stream"
;
case
TRN_TYPE_DROP_STREAM
:
return
"drop-stream"
;
case
TRN_TYPE_CONSUMER_LOST
:
return
"consumer-lost"
;
case
TRN_TYPE_CONSUMER_RECOVER
:
return
"consumer-recover"
;
case
TRN_TYPE_CREATE_DNODE
:
return
"create-qnode"
;
case
TRN_TYPE_DROP_DNODE
:
return
"drop-qnode"
;
case
TRN_TYPE_CREATE_DB
:
return
"create-db"
;
case
TRN_TYPE_ALTER_DB
:
return
"alter-db"
;
case
TRN_TYPE_DROP_DB
:
return
"drop-db"
;
case
TRN_TYPE_SPLIT_VGROUP
:
return
"split-vgroup"
;
case
TRN_TYPE_MERGE_VGROUP
:
return
"merge-vgroup"
;
case
TRN_TYPE_CREATE_STB
:
return
"create-stb"
;
case
TRN_TYPE_ALTER_STB
:
return
"alter-stb"
;
case
TRN_TYPE_DROP_STB
:
return
"drop-stb"
;
case
TRN_TYPE_CREATE_SMA
:
return
"create-sma"
;
case
TRN_TYPE_DROP_SMA
:
return
"drop-sma"
;
default:
return
"invalid"
;
}
}
static
void
mndTransTestStartFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
mInfo
(
"test trans start, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
}
...
...
@@ -594,7 +519,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease
(
pSdb
,
pTrans
);
}
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Type
type
,
const
SRpcMsg
*
pReq
)
{
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Conflct
conflict
,
const
SRpcMsg
*
pReq
)
{
STrans
*
pTrans
=
taosMemoryCalloc
(
1
,
sizeof
(
STrans
));
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -605,8 +530,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans
->
id
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_TRANS
);
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
type
=
type
;
pTrans
->
parallel
=
TRN_EXEC_PARA
LLEL
;
pTrans
->
conflict
=
conflict
;
pTrans
->
exec
=
TRN_EXEC_PRAR
LLEL
;
pTrans
->
createdTime
=
taosGetTimestampMs
();
pTrans
->
redoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
...
...
@@ -627,7 +552,7 @@ static void mndTransDropActions(SArray *pArray) {
int32_t
size
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
taosMemoryFreeClear
(
pAction
->
pRaw
);
}
else
{
taosMemoryFreeClear
(
pAction
->
pCont
);
...
...
@@ -658,17 +583,17 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
stage
=
TRN_STAGE_REDO_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_REDO_ACTION
,
.
actionType
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
redoActions
,
&
action
);
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
stage
=
TRN_STAGE_UNDO_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_UNDO_ACTION
,
.
actionType
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
undoActions
,
&
action
);
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
stage
=
TRN_STAGE_COMMIT_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_COMMIT_ACTION
,
.
actionType
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
commitActions
,
&
action
);
}
...
...
@@ -698,7 +623,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
memcpy
(
pTrans
->
dbname
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
}
void
mndTransSet
NoParallel
(
STrans
*
pTrans
)
{
pTrans
->
parallel
=
TRN_EXEC_NO_PARALLE
L
;
}
void
mndTransSet
Serial
(
STrans
*
pTrans
)
{
pTrans
->
exec
=
TRN_EXEC_SERIA
L
;
}
static
int32_t
mndTransSync
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
...
...
@@ -721,76 +646,45 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return
0
;
}
static
bool
mndIsBasicTrans
(
STrans
*
pTrans
)
{
return
pTrans
->
type
>
TRN_TYPE_BASIC_SCOPE
&&
pTrans
->
type
<
TRN_TYPE_BASIC_SCOPE_END
;
}
static
bool
mndIsGlobalTrans
(
STrans
*
pTrans
)
{
return
pTrans
->
type
>
TRN_TYPE_GLOBAL_SCOPE
&&
pTrans
->
type
<
TRN_TYPE_GLOBAL_SCOPE_END
;
}
static
bool
mndIsDbTrans
(
STrans
*
pTrans
)
{
return
pTrans
->
type
>
TRN_TYPE_DB_SCOPE
&&
pTrans
->
type
<
TRN_TYPE_DB_SCOPE_END
;
}
static
bool
mndIsStbTrans
(
STrans
*
pTrans
)
{
return
pTrans
->
type
>
TRN_TYPE_STB_SCOPE
&&
pTrans
->
type
<
TRN_TYPE_STB_SCOPE_END
;
}
static
bool
mndCheckTransConflict
(
SMnode
*
pMnode
,
STrans
*
pNewTrans
)
{
static
bool
mndCheckTransConflict
(
SMnode
*
pMnode
,
STrans
*
pNew
)
{
STrans
*
pTrans
=
NULL
;
void
*
pIter
=
NULL
;
bool
conflict
=
false
;
if
(
mndIsBasicTrans
(
pNewTrans
)
)
return
conflict
;
if
(
pNew
->
conflict
==
TRN_CONFLICT_NOTHING
)
return
conflict
;
while
(
1
)
{
pIter
=
sdbFetch
(
pMnode
->
pSdb
,
SDB_TRANS
,
pIter
,
(
void
**
)
&
pTrans
);
if
(
pIter
==
NULL
)
break
;
if
(
mndIsGlobalTrans
(
pNewTrans
))
{
if
(
mndIsDbTrans
(
pTrans
)
||
mndIsStbTrans
(
pTrans
))
{
mError
(
"trans:%d, can't execute since trans:%d in progress db:%s"
,
pNewTrans
->
id
,
pTrans
->
id
,
pTrans
->
dbname
);
conflict
=
true
;
}
else
{
}
if
(
pNew
->
conflict
==
TRN_CONFLICT_GLOBAL
)
conflict
=
true
;
if
(
pNew
->
conflict
==
TRN_CONFLICT_DB
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_GLOBAL
)
conflict
=
true
;
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
&&
strcmp
(
pNew
->
dbname
,
pTrans
->
dbname
)
==
0
)
conflict
=
true
;
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB_INSIDE
&&
strcmp
(
pNew
->
dbname
,
pTrans
->
dbname
)
==
0
)
conflict
=
true
;
}
else
if
(
mndIsDbTrans
(
pNewTrans
))
{
if
(
mndIsGlobalTrans
(
pTrans
))
{
mError
(
"trans:%d, can't execute since trans:%d in progress"
,
pNewTrans
->
id
,
pTrans
->
id
);
conflict
=
true
;
}
else
if
(
mndIsDbTrans
(
pTrans
)
||
mndIsStbTrans
(
pTrans
))
{
if
(
strcmp
(
pNewTrans
->
dbname
,
pTrans
->
dbname
)
==
0
)
{
mError
(
"trans:%d, can't execute since trans:%d in progress db:%s"
,
pNewTrans
->
id
,
pTrans
->
id
,
pTrans
->
dbname
);
conflict
=
true
;
}
}
else
{
}
}
else
if
(
mndIsStbTrans
(
pNewTrans
))
{
if
(
mndIsGlobalTrans
(
pTrans
))
{
mError
(
"trans:%d, can't execute since trans:%d in progress"
,
pNewTrans
->
id
,
pTrans
->
id
);
conflict
=
true
;
}
else
if
(
mndIsDbTrans
(
pTrans
))
{
if
(
strcmp
(
pNewTrans
->
dbname
,
pTrans
->
dbname
)
==
0
)
{
mError
(
"trans:%d, can't execute since trans:%d in progress db:%s"
,
pNewTrans
->
id
,
pTrans
->
id
,
pTrans
->
dbname
);
conflict
=
true
;
}
}
else
{
}
if
(
pNew
->
conflict
==
TRN_CONFLICT_DB_INSIDE
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_GLOBAL
)
conflict
=
true
;
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
&&
strcmp
(
pNew
->
dbname
,
pTrans
->
dbname
)
==
0
)
conflict
=
true
;
}
sdbRelease
(
pMnode
->
pSdb
,
pTrans
);
}
sdbCancelFetch
(
pMnode
->
pSdb
,
pIter
);
sdbRelease
(
pMnode
->
pSdb
,
pTrans
);
if
(
conflict
)
{
mError
(
"trans:%d, can't execute since conflict with trans:%d, db:%s"
,
pNew
->
id
,
pTrans
->
id
,
pTrans
->
dbname
);
}
return
conflict
;
}
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
||
pTrans
->
conflict
==
TRN_CONFLICT_DB_INSIDE
)
{
if
(
strlen
(
pTrans
->
dbname
)
==
0
)
{
terrno
=
TSDB_CODE_MND_TRANS_CONFLICT
;
mError
(
"trans:%d, failed to prepare conflict db not set"
,
pTrans
->
id
);
return
-
1
;
}
}
if
(
mndCheckTransConflict
(
pMnode
,
pTrans
))
{
terrno
=
TSDB_CODE_MND_TRANS_CONFLICT
;
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
...
...
@@ -921,9 +815,6 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
if
(
pAction
!=
NULL
)
{
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
pRsp
->
code
;
if
(
pAction
->
errCode
!=
0
)
{
tstrncpy
(
pTrans
->
lastError
,
tstrerror
(
pAction
->
errCode
),
TSDB_TRANS_ERROR_LEN
);
}
}
mDebug
(
"trans:%d, %s:%d response is received, code:0x%x, accept:0x%x"
,
transId
,
mndTransStr
(
pAction
->
stage
),
action
,
...
...
@@ -1004,7 +895,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
}
static
int32_t
mndTransExecSingleAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STransAction
*
pAction
)
{
if
(
pAction
->
isRaw
)
{
if
(
pAction
->
actionType
)
{
return
mndTransWriteSingleLog
(
pMnode
,
pTrans
,
pAction
);
}
else
{
return
mndTransSendSingleMsg
(
pMnode
,
pTrans
,
pAction
);
...
...
@@ -1032,24 +923,36 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return
-
1
;
}
int32_t
numOfExecuted
=
0
;
int32_t
errCode
=
0
;
int32_t
numOfExecuted
=
0
;
int32_t
errCode
=
0
;
STransAction
*
pErrAction
=
NULL
;
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
->
msgReceived
||
pAction
->
rawWritten
)
{
numOfExecuted
++
;
if
(
pAction
->
errCode
!=
0
&&
pAction
->
errCode
!=
pAction
->
acceptableCode
)
{
errCode
=
pAction
->
errCode
;
pErrAction
=
pAction
;
}
}
}
if
(
numOfExecuted
==
numOfActions
)
{
if
(
errCode
==
0
)
{
pTrans
->
lastErrorAction
=
0
;
pTrans
->
lastErrorNo
=
0
;
pTrans
->
lastErrorMsgType
=
0
;
memset
(
&
pTrans
->
lastErrorEpset
,
0
,
sizeof
(
pTrans
->
lastErrorEpset
));
mDebug
(
"trans:%d, all %d actions execute successfully"
,
pTrans
->
id
,
numOfActions
);
return
0
;
}
else
{
mError
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
errCode
&
0XFFFF
);
if
(
pErrAction
!=
NULL
)
{
pTrans
->
lastErrorMsgType
=
pErrAction
->
msgType
;
pTrans
->
lastErrorAction
=
pErrAction
->
id
;
pTrans
->
lastErrorNo
=
pErrAction
->
errCode
;
pTrans
->
lastErrorEpset
=
pErrAction
->
epSet
;
}
mndTransResetActions
(
pMnode
,
pTrans
,
pArray
);
terrno
=
errCode
;
return
errCode
;
...
...
@@ -1084,7 +987,7 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
return
code
;
}
static
int32_t
mndTransExecuteRedoActions
NoParalle
l
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteRedoActions
Seria
l
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
int32_t
numOfActions
=
taosArrayGetSize
(
pTrans
->
redoActions
);
if
(
numOfActions
==
0
)
return
code
;
...
...
@@ -1111,6 +1014,18 @@ static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTra
}
}
if
(
code
==
0
)
{
pTrans
->
lastErrorAction
=
0
;
pTrans
->
lastErrorNo
=
0
;
pTrans
->
lastErrorMsgType
=
0
;
memset
(
&
pTrans
->
lastErrorEpset
,
0
,
sizeof
(
pTrans
->
lastErrorEpset
));
}
else
{
pTrans
->
lastErrorMsgType
=
pAction
->
msgType
;
pTrans
->
lastErrorAction
=
action
;
pTrans
->
lastErrorNo
=
pAction
->
errCode
;
pTrans
->
lastErrorEpset
=
pAction
->
epSet
;
}
if
(
code
==
0
)
{
pTrans
->
redoActionPos
++
;
mDebug
(
"trans:%d, %s:%d is executed and need sync to other mnodes"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
...
...
@@ -1144,8 +1059,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
bool
continueExec
=
true
;
int32_t
code
=
0
;
if
(
pTrans
->
parallel
==
TRN_EXEC_NO_PARALLE
L
)
{
code
=
mndTransExecuteRedoActions
NoParalle
l
(
pMnode
,
pTrans
);
if
(
pTrans
->
exec
==
TRN_EXEC_SERIA
L
)
{
code
=
mndTransExecuteRedoActions
Seria
l
(
pMnode
,
pTrans
);
}
else
{
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
}
...
...
@@ -1455,11 +1370,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
dbname
,
false
);
char
type
[
TSDB_TRANS_TYPE_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
type
,
mndTransType
(
pTrans
->
type
),
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
type
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTrans
->
failedTimes
,
false
);
...
...
@@ -1467,7 +1377,20 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTrans
->
lastExecTime
,
false
);
char
lastError
[
TSDB_TRANS_ERROR_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
lastError
,
pTrans
->
lastError
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
char
detail
[
TSDB_TRANS_ERROR_LEN
]
=
{
0
};
if
(
pTrans
->
lastErrorNo
!=
0
)
{
int32_t
len
=
snprintf
(
detail
,
sizeof
(
detail
),
"action:%d errno:0x%x(%s) "
,
pTrans
->
lastErrorAction
,
pTrans
->
lastErrorNo
&
0xFFFF
,
tstrerror
(
pTrans
->
lastErrorNo
));
SEpSet
epset
=
pTrans
->
lastErrorEpset
;
if
(
epset
.
numOfEps
>
0
)
{
len
+=
snprintf
(
detail
+
len
,
sizeof
(
detail
)
-
len
,
"msgType:%s numOfEps:%d inUse:%d "
,
TMSG_INFO
(
pTrans
->
lastErrorMsgType
),
epset
.
numOfEps
,
epset
.
inUse
);
}
for
(
int32_t
i
=
0
;
i
<
pTrans
->
lastErrorEpset
.
numOfEps
;
++
i
)
{
len
+=
snprintf
(
detail
+
len
,
sizeof
(
detail
)
-
len
,
"ep:%d-%s:%u "
,
i
,
epset
.
eps
[
i
].
fqdn
,
epset
.
eps
[
i
].
port
);
}
}
STR_WITH_MAXSIZE_TO_VARSTR
(
lastError
,
detail
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
lastError
,
false
);
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
70e6e159
...
...
@@ -79,10 +79,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mDebug
(
"user:%s, will be created when deploying, raw:%p"
,
userObj
.
user
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_USER
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
userObj
.
user
,
terrstr
());
return
-
1
;
...
...
@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndCreateDefaultUsers
(
SMnode
*
pMnode
)
{
...
...
@@ -291,7 +287,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
superUser
=
pCreate
->
superUser
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -371,7 +367,7 @@ _OVER:
}
static
int32_t
mndAlterUser
(
SMnode
*
pMnode
,
SUserObj
*
pOld
,
SUserObj
*
pNew
,
SRpcMsg
*
pReq
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_ALTER_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to alter since %s"
,
pOld
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -578,7 +574,7 @@ _OVER:
}
static
int32_t
mndDropUser
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SUserObj
*
pUser
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to drop since %s"
,
pUser
->
user
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/test/trans/trans2.cpp
浏览文件 @
70e6e159
...
...
@@ -11,6 +11,8 @@
#include <gtest/gtest.h>
#if 0
#include "mndTrans.h"
#include "mndUser.h"
#include "tcache.h"
...
...
@@ -103,7 +105,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {}
void TearDown() override {}
int32_t
CreateUserLog
(
const
char
*
acct
,
const
char
*
user
,
ETrn
Type
type
,
SDbObj
*
pDb
)
{
int32_t CreateUserLog(const char *acct, const char *user, ETrn
Conflct conflict
, SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
...
...
@@ -113,7 +115,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
type
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,
conflict
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -135,7 +137,7 @@ class MndTestTrans2 : public ::testing::Test {
return code;
}
int32_t
CreateUserAction
(
const
char
*
acct
,
const
char
*
user
,
bool
hasUndoAction
,
ETrnPolicy
policy
,
ETrn
Type
type
,
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrn
Conflct conflict
,
SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
...
...
@@ -146,7 +148,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
policy
,
type
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, policy,
conflict
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -218,7 +220,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_USER
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_
CONFLICT_NOTHING
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -528,3 +530,5 @@ TEST_F(MndTestTrans2, 04_Conflict) {
mndReleaseUser(pMnode, pUser);
}
}
#endif
\ No newline at end of file
tests/script/jenkins/basic.txt
浏览文件 @
70e6e159
...
...
@@ -57,7 +57,6 @@
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
# ---- show
./test.sh -f tsim/show/basic.sim
...
...
tests/script/tsim/trans/create_db.sim
浏览文件 @
70e6e159
...
...
@@ -76,14 +76,6 @@ if $data[0][3] != d1 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
sql_error create database d1 vgroups 2;
print =============== start dnode2
...
...
@@ -125,15 +117,7 @@ endi
if $data[0][3] != d2 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
return
sql_error create database d2 vgroups 2;
print =============== kill transaction
...
...
tests/test/c/sdbDump.c
浏览文件 @
70e6e159
...
...
@@ -279,9 +279,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject
(
item
,
"id"
,
pObj
->
id
);
tjsonAddIntegerToObject
(
item
,
"stage"
,
pObj
->
stage
);
tjsonAddIntegerToObject
(
item
,
"policy"
,
pObj
->
policy
);
tjsonAddIntegerToObject
(
item
,
"type"
,
pObj
->
type
);
tjsonAddIntegerToObject
(
item
,
"conflict"
,
pObj
->
conflict
);
tjsonAddIntegerToObject
(
item
,
"exec"
,
pObj
->
exec
);
tjsonAddStringToObject
(
item
,
"createdTime"
,
i642str
(
pObj
->
createdTime
));
tjsonAddStringToObject
(
item
,
"dbUid"
,
i642str
(
pObj
->
dbUid
));
tjsonAddStringToObject
(
item
,
"dbname"
,
pObj
->
dbname
);
tjsonAddIntegerToObject
(
item
,
"commitLogNum"
,
taosArrayGetSize
(
pObj
->
commitActions
));
tjsonAddIntegerToObject
(
item
,
"redoActionNum"
,
taosArrayGetSize
(
pObj
->
redoActions
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录