Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
86f8bf6c
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
86f8bf6c
编写于
5月 31, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: make trans support multi steps
上级
53def5b7
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
194 addition
and
137 deletion
+194
-137
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+1
-1
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+7
-6
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+2
-6
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+2
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+6
-4
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+1
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+1
-1
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+2
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+130
-84
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+2
-2
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+1
-0
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-0
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+30
-24
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+5
-1
未找到文件。
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
86f8bf6c
...
...
@@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER:
if
(
code
!=
0
)
{
d
Error
(
"msg:%p, failed to process since %s"
,
pMsg
,
terrstr
(
));
d
Trace
(
"msg:%p, failed to process since %s, type:%s"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pRpc
->
msgType
));
if
(
terrno
!=
0
)
code
=
terrno
;
if
(
IsReq
(
pRpc
))
{
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
86f8bf6c
...
...
@@ -31,17 +31,18 @@ typedef enum {
typedef
struct
{
int32_t
id
;
tmsg_t
msgTyp
e
;
int
8_t
msgSent
;
int8_t
msgReceived
;
int32_t
errCod
e
;
int
32_t
acceptableCode
;
int8_t
stage
;
int8_t
isRaw
;
int8_t
rawWritten
;
SSdbRaw
*
pRaw
;
int8_t
msgSent
;
int8_t
msgReceived
;
tmsg_t
msgType
;
SEpSet
epSet
;
int32_t
errCode
;
int32_t
acceptableCode
;
int32_t
contLen
;
void
*
pCont
;
SSdbRaw
*
pRaw
;
}
STransAction
;
typedef
void
(
*
TransCbFp
)(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
);
...
...
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
86f8bf6c
...
...
@@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"acct:%s, will be created while deploy sdb, raw:%p"
,
acctObj
.
acct
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
mDebug
(
"acct:%s, will be created when deploying, raw:%p"
,
acctObj
.
acct
,
pRaw
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_ACCT
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"acct:%s, failed to create since %s"
,
acctObj
.
acct
,
terrstr
());
...
...
@@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
...
...
@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndAcctActionEncode
(
SAcctObj
*
pAcct
)
{
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
86f8bf6c
...
...
@@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj
.
id
=
mndGenerateUid
(
clusterObj
.
name
,
TSDB_CLUSTER_ID_LEN
);
clusterObj
.
id
=
(
clusterObj
.
id
>=
0
?
clusterObj
.
id
:
-
clusterObj
.
id
);
pMnode
->
clusterId
=
clusterObj
.
id
;
m
Debug
(
"cluster:%"
PRId64
", name is %s"
,
clusterObj
.
id
,
clusterObj
.
name
);
m
Info
(
"cluster:%"
PRId64
", name is %s"
,
clusterObj
.
id
,
clusterObj
.
name
);
SSdbRaw
*
pRaw
=
mndClusterActionEncode
(
&
clusterObj
);
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"cluster:%"
PRId64
", will be created wh
ile deploy sdb
, raw:%p"
,
clusterObj
.
id
,
pRaw
);
mDebug
(
"cluster:%"
PRId64
", will be created wh
en deploying
, raw:%p"
,
clusterObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
86f8bf6c
...
...
@@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pDbVgVersion
->
dbFName
);
if
(
pDb
==
NULL
)
{
m
Debug
(
"db:%s, no exist"
,
pDbVgVersion
->
dbFName
);
m
Trace
(
"db:%s, no exist"
,
pDbVgVersion
->
dbFName
);
memcpy
(
usedbRsp
.
db
,
pDbVgVersion
->
dbFName
,
TSDB_DB_FNAME_LEN
);
usedbRsp
.
uid
=
pDbVgVersion
->
dbId
;
usedbRsp
.
vgVersion
=
-
1
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
86f8bf6c
...
...
@@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
mDebug
(
"dnode:%d, will be created wh
ile deploy sdb
, raw:%p"
,
dnodeObj
.
id
,
pRaw
);
mDebug
(
"dnode:%d, will be created wh
en deploying
, raw:%p"
,
dnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
...
...
@@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseMnode
(
pMnode
,
pObj
);
}
int64_t
dnodeVer
=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
)
+
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_MNODE
);
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
bool
dnodeChanged
=
(
statusReq
.
dnodeVer
!=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
)
);
bool
dnodeChanged
=
(
statusReq
.
dnodeVer
!=
dnodeVer
);
bool
reboot
=
(
pDnode
->
rebootTime
!=
statusReq
.
rebootTime
);
bool
needCheck
=
!
online
||
dnodeChanged
||
reboot
;
...
...
@@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if
(
!
online
)
{
mInfo
(
"dnode:%d, from offline to online"
,
pDnode
->
id
);
}
else
{
mDebug
(
"dnode:%d, send dnode eps"
,
pDnode
->
id
);
mDebug
(
"dnode:%d, send dnode epset, online:%d ver:% "
PRId64
":%"
PRId64
" reboot:%d"
,
pDnode
->
id
,
online
,
statusReq
.
dnodeVer
,
dnodeVer
,
reboot
);
}
pDnode
->
rebootTime
=
statusReq
.
rebootTime
;
...
...
@@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode
->
numOfSupportVnodes
=
statusReq
.
numOfSupportVnodes
;
SStatusRsp
statusRsp
=
{
0
};
statusRsp
.
dnodeVer
=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
)
+
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_MNODE
)
;
statusRsp
.
dnodeVer
=
dnodeVer
;
statusRsp
.
dnodeCfg
.
dnodeId
=
pDnode
->
id
;
statusRsp
.
dnodeCfg
.
clusterId
=
pMnode
->
clusterId
;
statusRsp
.
pDnodeEps
=
taosArrayInit
(
mndGetDnodeSize
(
pMnode
),
sizeof
(
SDnodeEp
));
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
86f8bf6c
...
...
@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
}
else
if
(
code
==
0
)
{
mTrace
(
"msg:%p, successfully processed and response"
,
pMsg
);
}
else
{
m
Error
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
m
Debug
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
}
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
86f8bf6c
...
...
@@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"mnode:%d, will be created wh
ile deploy sdb
, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
mDebug
(
"mnode:%d, will be created wh
en deploying
, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
86f8bf6c
...
...
@@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
pReq
->
info
.
rspLen
=
rspLen
;
code
=
0
;
m
Debug
(
"stb:
%s.%s, meta is retrieved"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
m
Trace
(
"
%s.%s, meta is retrieved"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
_OVER:
if
(
code
!=
0
)
{
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
86f8bf6c
...
...
@@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void
mndRestoreFinish
(
struct
SSyncFSM
*
pFsm
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
if
(
!
pMnode
->
deploy
)
{
mInfo
(
"mnode sync restore finished"
);
mInfo
(
"mnode sync restore finished
, and will handle outstanding transactions
"
);
mndTransPullup
(
pMnode
);
mndSetRestore
(
pMnode
,
true
);
}
else
{
...
...
@@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) {
}
else
{
syncStart
(
pMgmt
->
sync
);
}
mDebug
(
"
sync:%"
PRId64
" is started,
standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
mDebug
(
"
mnode sync started, id:%"
PRId64
"
standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
86f8bf6c
...
...
@@ -37,7 +37,6 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static
void
mndTransDropLogs
(
SArray
*
pArray
);
static
void
mndTransDropActions
(
SArray
*
pArray
);
static
void
mndTransDropData
(
STrans
*
pTrans
);
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
...
@@ -133,15 +132,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
i
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pRaw
,
len
,
_OVER
)
}
else
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
),
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pAction
->
msgType
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgSent
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgReceived
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAction
->
pCont
,
pAction
->
contLen
,
_OVER
)
}
...
...
@@ -149,15 +154,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
undoActions
,
i
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pRaw
,
len
,
_OVER
)
}
else
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
),
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pAction
->
msgType
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgSent
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgReceived
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAction
->
pCont
,
pAction
->
contLen
,
_OVER
)
}
...
...
@@ -165,15 +176,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
commitActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
commitActions
,
i
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
id
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
errCode
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
isRaw
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
stage
,
_OVER
)
if
(
pAction
->
isRaw
)
{
int32_t
len
=
sdbGetRawTotalSize
(
pAction
->
pRaw
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
rawWritten
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
len
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pRaw
,
len
,
_OVER
)
}
else
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
),
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pAction
->
msgType
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
acceptableCode
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgSent
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgReceived
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAction
->
pCont
,
pAction
->
contLen
,
_OVER
)
}
...
...
@@ -259,19 +276,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
if
(
pTrans
->
commitActions
==
NULL
)
goto
_OVER
;
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
pData
=
taosMemoryMalloc
(
dataLen
);
if
(
pData
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
if
(
action
.
pRaw
==
NULL
)
goto
_OVER
;
mTrace
(
"raw:%p, is created"
,
pData
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pData
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
redoActions
,
&
pData
)
==
NULL
)
goto
_OVER
;
pData
=
NULL
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
action
.
pRaw
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
redoActions
,
&
action
)
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
NULL
;
}
else
{
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
),
_OVER
);
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
action
.
msgType
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgSent
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgReceived
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
contLen
,
_OVER
)
action
.
pCont
=
taosMemoryMalloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
goto
_OVER
;
...
...
@@ -282,19 +305,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
pData
=
taosMemoryMalloc
(
dataLen
);
if
(
pData
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
if
(
action
.
pRaw
==
NULL
)
goto
_OVER
;
mTrace
(
"raw:%p, is created"
,
pData
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pData
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
undoActions
,
&
pData
)
==
NULL
)
goto
_OVER
;
pData
=
NULL
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
action
.
pRaw
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
undoActions
,
&
action
)
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
NULL
;
}
else
{
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
),
_OVER
);
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
action
.
msgType
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgSent
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgReceived
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
contLen
,
_OVER
)
action
.
pCont
=
taosMemoryMalloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
goto
_OVER
;
...
...
@@ -305,19 +334,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
for
(
int32_t
i
=
0
;
i
<
commitActionNum
;
++
i
)
{
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
id
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
errCode
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
isRaw
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
stage
,
_OVER
)
if
(
action
.
isRaw
)
{
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
rawWritten
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
dataLen
,
_OVER
)
pData
=
taosMemoryMalloc
(
dataLen
);
if
(
pData
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
taosMemoryMalloc
(
dataLen
);
if
(
action
.
pRaw
==
NULL
)
goto
_OVER
;
mTrace
(
"raw:%p, is created"
,
pData
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pData
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
commitActions
,
&
pData
)
==
NULL
)
goto
_OVER
;
pData
=
NULL
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
action
.
pRaw
,
dataLen
,
_OVER
);
if
(
taosArrayPush
(
pTrans
->
commitActions
,
&
action
)
==
NULL
)
goto
_OVER
;
action
.
pRaw
=
NULL
;
}
else
{
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
),
_OVER
);
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
action
.
msgType
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
acceptableCode
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgSent
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
action
.
msgReceived
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
action
.
contLen
,
_OVER
)
action
.
pCont
=
taosMemoryMalloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
goto
_OVER
;
...
...
@@ -344,7 +379,6 @@ _OVER:
mError
(
"trans:%d, failed to parse from raw:%p since %s"
,
pTrans
->
id
,
pRaw
,
terrstr
());
mndTransDropData
(
pTrans
);
taosMemoryFreeClear
(
pRow
);
taosMemoryFreeClear
(
pData
);
taosMemoryFreeClear
(
action
.
pCont
);
return
NULL
;
}
...
...
@@ -502,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) {
}
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
,
bool
callFunc
)
{
m
Debug
(
"trans:%d, perform delete action, row:%p stage:%s callfunc:%d"
,
pTrans
->
id
,
pTrans
,
mndTransStr
(
pTrans
->
stage
),
m
Trace
(
"trans:%d, perform delete action, row:%p stage:%s callfunc:%d"
,
pTrans
->
id
,
pTrans
,
mndTransStr
(
pTrans
->
stage
),
callFunc
);
if
(
pTrans
->
stopFunc
>
0
&&
callFunc
)
{
TransCbFp
fp
=
mndTransGetCbFp
(
pTrans
->
stopFunc
);
...
...
@@ -515,20 +549,34 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
return
0
;
}
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
pOld
,
STrans
*
pNew
)
{
if
(
pNew
->
stage
==
TRN_STAGE_COMMIT
)
{
pNew
->
stage
=
TRN_STAGE_COMMIT_ACTION
;
mTrace
(
"trans:%d, stage from %s to %s"
,
pNew
->
id
,
mndTransStr
(
TRN_STAGE_COMMIT
),
mndTransStr
(
TRN_STAGE_COMMIT_ACTION
));
}
if
(
pNew
->
stage
==
TRN_STAGE_ROLLBACK
)
{
pNew
->
stage
=
TRN_STAGE_FINISHED
;
mTrace
(
"trans:%d, stage from %s to %s"
,
pNew
->
id
,
mndTransStr
(
TRN_STAGE_ROLLBACK
),
mndTransStr
(
TRN_STAGE_FINISHED
));
static
void
mndTransUpdateActions
(
SArray
*
pOldArray
,
SArray
*
pNewArray
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOldArray
);
++
i
)
{
STransAction
*
pOldAction
=
taosArrayGet
(
pOldArray
,
i
);
STransAction
*
pNewAction
=
taosArrayGet
(
pNewArray
,
i
);
pOldAction
->
rawWritten
=
pNewAction
->
rawWritten
;
pOldAction
->
msgSent
=
pNewAction
->
msgSent
;
pOldAction
->
msgReceived
=
pNewAction
->
msgReceived
;
pOldAction
->
errCode
=
pNewAction
->
errCode
;
}
}
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
pOld
,
STrans
*
pNew
)
{
mTrace
(
"trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s"
,
pOld
->
id
,
pOld
,
mndTransStr
(
pOld
->
stage
),
pNew
,
mndTransStr
(
pNew
->
stage
));
mndTransUpdateActions
(
pOld
->
redoActions
,
pNew
->
redoActions
);
mndTransUpdateActions
(
pOld
->
undoActions
,
pNew
->
undoActions
);
mndTransUpdateActions
(
pOld
->
commitActions
,
pNew
->
commitActions
);
pOld
->
stage
=
pNew
->
stage
;
if
(
pOld
->
stage
==
TRN_STAGE_COMMIT
)
{
pOld
->
stage
=
TRN_STAGE_COMMIT_ACTION
;
mTrace
(
"trans:%d, stage from commit to commitAction"
,
pNew
->
id
);
}
if
(
pOld
->
stage
==
TRN_STAGE_ROLLBACK
)
{
pOld
->
stage
=
TRN_STAGE_FINISHED
;
mTrace
(
"trans:%d, stage from rollback to finished"
,
pNew
->
id
);
}
return
0
;
}
...
...
@@ -557,8 +605,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
type
=
type
;
pTrans
->
parallel
=
TRN_EXEC_PARALLEL
;
pTrans
->
createdTime
=
taosGetTimestampMs
();
if
(
pReq
!=
NULL
)
pTrans
->
rpcInfo
=
pReq
->
info
;
pTrans
->
redoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
commitActions
=
taosArrayInit
(
TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
...
...
@@ -569,7 +617,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
return
NULL
;
}
mDebug
(
"trans:%d, local object is created, data:%p"
,
pTrans
->
id
,
pTrans
);
if
(
pReq
!=
NULL
)
pTrans
->
rpcInfo
=
pReq
->
info
;
mTrace
(
"trans:%d, local object is created, data:%p"
,
pTrans
->
id
,
pTrans
);
return
pTrans
;
}
...
...
@@ -578,7 +627,7 @@ static void mndTransDropActions(SArray *pArray) {
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
if
(
pAction
->
isRaw
)
{
sdbFreeRaw
(
pAction
->
pRaw
);
taosMemoryFreeClear
(
pAction
->
pRaw
);
}
else
{
taosMemoryFreeClear
(
pAction
->
pCont
);
}
...
...
@@ -590,12 +639,14 @@ static void mndTransDropActions(SArray *pArray) {
void
mndTransDrop
(
STrans
*
pTrans
)
{
if
(
pTrans
!=
NULL
)
{
mndTransDropData
(
pTrans
);
m
Debug
(
"trans:%d, local object is freed, data:%p"
,
pTrans
->
id
,
pTrans
);
m
Trace
(
"trans:%d, local object is freed, data:%p"
,
pTrans
->
id
,
pTrans
);
taosMemoryFreeClear
(
pTrans
);
}
}
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
)
{
pAction
->
id
=
taosArrayGetSize
(
pArray
);
void
*
ptr
=
taosArrayPush
(
pArray
,
pAction
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -606,25 +657,27 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_REDO_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
redoActions
,
&
action
);
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_UNDO_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
undoActions
,
&
action
);
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{.
isRaw
=
true
,
.
pRaw
=
pRaw
};
STransAction
action
=
{.
stage
=
TRN_STAGE_COMMIT_ACTION
,
.
isRaw
=
true
,
.
pRaw
=
pRaw
};
return
mndTransAppendAction
(
pTrans
->
commitActions
,
&
action
);
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
)
{
pAction
->
stage
=
TRN_STAGE_REDO_ACTION
;
return
mndTransAppendAction
(
pTrans
->
redoActions
,
pAction
);
}
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
)
{
pAction
->
stage
=
TRN_STAGE_UNDO_ACTION
;
return
mndTransAppendAction
(
pTrans
->
undoActions
,
pAction
);
}
...
...
@@ -821,7 +874,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
taosMemoryFree
(
pTrans
->
rpcRsp
);
mDebug
(
"trans:%d, send rsp, code:0x%x stage:%d app:%p"
,
pTrans
->
id
,
code
,
pTrans
->
stage
,
pTrans
->
rpcInfo
.
ahandle
);
mDebug
(
"trans:%d, send rsp, code:0x%x stage:%s app:%p"
,
pTrans
->
id
,
code
,
mndTransStr
(
pTrans
->
stage
),
pTrans
->
rpcInfo
.
ahandle
);
SRpcMsg
rspMsg
=
{
.
code
=
code
,
.
pCont
=
rpcCont
,
...
...
@@ -877,55 +931,46 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
}
}
mDebug
(
"trans:%d,
action:%d response is received, code:0x%x, accept:0x%04x"
,
transId
,
action
,
pRsp
->
code
,
pAction
->
acceptableCode
);
mDebug
(
"trans:%d,
%s:%d response is received, code:0x%x, accept:0x%x"
,
transId
,
mndTransStr
(
pAction
->
stage
),
action
,
p
Rsp
->
code
,
p
Action
->
acceptableCode
);
mndTransExecute
(
pMnode
,
pTrans
);
_OVER:
mndReleaseTrans
(
pMnode
,
pTrans
);
}
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
if
(
arraySize
==
0
)
return
0
;
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
if
(
sdbWriteWithoutFree
(
pSdb
,
pRaw
)
!=
0
)
{
code
=
((
terrno
!=
0
)
?
terrno
:
-
1
);
}
}
terrno
=
code
;
return
code
;
}
static
void
mndTransResetActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SArray
*
pArray
)
{
int32_t
numOfActions
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
&&
pAction
->
errCode
==
0
)
continue
;
if
(
pAction
->
rawWritten
&&
pAction
->
errCode
==
0
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
&&
(
pAction
->
errCode
==
0
||
pAction
->
errCode
==
pAction
->
acceptableCode
))
continue
;
if
(
pAction
->
rawWritten
&&
(
pAction
->
errCode
==
0
||
pAction
->
errCode
==
pAction
->
acceptableCode
))
continue
;
pAction
->
rawWritten
=
0
;
pAction
->
msgSent
=
0
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
mDebug
(
"trans:%d,
action:%d execute status is reset"
,
pTrans
->
id
,
action
);
mDebug
(
"trans:%d,
%s:%d execute status is reset"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
)
,
action
);
}
}
static
int32_t
mndTransWriteSingleLog
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STransAction
*
pAction
)
{
if
(
pAction
->
rawWritten
)
return
0
;
int32_t
code
=
sdbWriteWithoutFree
(
pMnode
->
pSdb
,
pAction
->
pRaw
);
if
(
code
==
0
)
{
mDebug
(
"trans:%d, action:%d write to sdb"
,
pTrans
->
id
,
pAction
->
id
);
if
(
code
==
0
||
terrno
==
TSDB_CODE_SDB_OBJ_NOT_THERE
)
{
pAction
->
rawWritten
=
true
;
pAction
->
errCode
=
0
;
code
=
0
;
mDebug
(
"trans:%d, %s:%d write to sdb"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
}
else
{
mError
(
"trans:%d, action:%d failed to write sdb since %s"
,
pTrans
->
id
,
pAction
->
id
,
terrstr
());
pAction
->
errCode
=
(
terrno
!=
0
)
?
terrno
:
code
;
mError
(
"trans:%d, %s:%d failed to write sdb since %s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
terrstr
());
}
return
code
;
...
...
@@ -952,13 +997,13 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
mDebug
(
"trans:%d,
action:%d is sent to %s:%u"
,
pTrans
->
id
,
pAction
->
id
,
mDebug
(
"trans:%d,
%s:%d is sent to %s:%u"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
)
,
pAction
->
id
,
pAction
->
epSet
.
eps
[
pAction
->
epSet
.
inUse
].
fqdn
,
pAction
->
epSet
.
eps
[
pAction
->
epSet
.
inUse
].
port
);
}
else
{
pAction
->
msgSent
=
0
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
(
terrno
!=
0
)
?
terrno
:
code
;
mError
(
"trans:%d,
action:%d not send since %s"
,
pTrans
->
id
,
pAction
->
id
,
terrstr
());
mError
(
"trans:%d,
%s:%d not send since %s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
)
,
pAction
->
id
,
terrstr
());
}
return
code
;
...
...
@@ -995,20 +1040,20 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return
-
1
;
}
int32_t
numOf
Receiv
ed
=
0
;
int32_t
numOf
Execut
ed
=
0
;
int32_t
errCode
=
0
;
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
)
{
numOf
Receiv
ed
++
;
if
(
(
pAction
->
msgSent
&&
pAction
->
msgReceived
)
||
pAction
->
rawWritten
)
{
numOf
Execut
ed
++
;
if
(
pAction
->
errCode
!=
0
&&
pAction
->
errCode
!=
pAction
->
acceptableCode
)
{
errCode
=
pAction
->
errCode
;
}
}
}
if
(
numOf
Receiv
ed
==
numOfActions
)
{
if
(
numOf
Execut
ed
==
numOfActions
)
{
if
(
errCode
==
0
)
{
mDebug
(
"trans:%d, all %d actions execute successfully"
,
pTrans
->
id
,
numOfActions
);
return
0
;
...
...
@@ -1019,7 +1064,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return
errCode
;
}
}
else
{
mDebug
(
"trans:%d, %d of %d actions executed"
,
pTrans
->
id
,
numOf
Receiv
ed
,
numOfActions
);
mDebug
(
"trans:%d, %d of %d actions executed"
,
pTrans
->
id
,
numOf
Execut
ed
,
numOfActions
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
}
...
...
@@ -1041,7 +1086,7 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
}
static
int32_t
mndTransExecuteCommitActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
commitActions
);
int32_t
code
=
mndTransExecute
Actions
(
pMnode
,
pTrans
,
pTrans
->
commitActions
);
if
(
code
!=
0
)
{
mError
(
"failed to execute commitActions since %s"
,
terrstr
());
}
...
...
@@ -1063,14 +1108,16 @@ static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) {
int32_t
code
=
mndTransExecSingleAction
(
pMnode
,
pTrans
,
pAction
);
if
(
code
==
0
)
{
pTrans
->
redoActionPos
++
;
mDebug
(
"trans:%d, redo action:%d is executed and need sync to other mnodes"
,
pTrans
->
id
,
pAction
->
id
);
mDebug
(
"trans:%d, %s:%d is executed and need sync to other mnodes"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
// todo sync these infos
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mDebug
(
"trans:%d,
redo action:%d is in progress and wait it finish"
,
pTrans
->
id
,
pAction
->
id
);
mDebug
(
"trans:%d,
%s:%d is in progress and wait it finish"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
)
,
pAction
->
id
);
continueExec
=
false
;
}
else
{
mError
(
"trans:%d, redo action:%d failed to execute since %s"
,
pTrans
->
id
,
pAction
->
id
,
terrstr
());
mError
(
"trans:%d, %s:%d failed to execute since %s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
terrstr
());
continueExec
=
false
;
}
...
...
@@ -1207,8 +1254,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
}
mDebug
(
"trans:%d, finished, code:0x%x, failedTimes:%d"
,
pTrans
->
id
,
pTrans
->
code
,
pTrans
->
failedTimes
);
mDebug
(
"trans:%d, execute finished, code:0x%x, failedTimes:%d"
,
pTrans
->
id
,
pTrans
->
code
,
pTrans
->
failedTimes
);
return
continueExec
;
}
...
...
@@ -1271,15 +1317,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgReceived
==
0
)
{
mInfo
(
"trans:%d,
action:%d set processed for kill msg received"
,
pTrans
->
id
,
i
);
mInfo
(
"trans:%d,
%s:%d set processed for kill msg received"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
)
,
i
);
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
0
;
}
if
(
pAction
->
errCode
!=
0
)
{
mInfo
(
"trans:%d,
action:%d set processed for kill msg received, errCode from %s to success"
,
pTrans
->
id
,
i
,
tstrerror
(
pAction
->
errCode
));
mInfo
(
"trans:%d,
%s:%d set processed for kill msg received, errCode from %s to success"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
i
,
tstrerror
(
pAction
->
errCode
));
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
0
;
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
86f8bf6c
...
...
@@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"user:%s, will be created wh
ile deploy sdb
, raw:%p"
,
userObj
.
user
,
pRaw
);
mDebug
(
"user:%s, will be created wh
en deploying
, raw:%p"
,
userObj
.
user
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
86f8bf6c
...
...
@@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
*
ppVgroups
=
pVgroups
;
code
=
0
;
mInfo
(
"db:%s, %d vgroups is alloced, replica:%d"
,
pDb
->
name
,
pDb
->
cfg
.
numOfVgroups
,
pDb
->
cfg
.
replications
);
mInfo
(
"db:%s,
total
%d vgroups is alloced, replica:%d"
,
pDb
->
name
,
pDb
->
cfg
.
numOfVgroups
,
pDb
->
cfg
.
replications
);
_OVER:
if
(
code
!=
0
)
taosMemoryFree
(
pVgroups
);
...
...
@@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
pVgid
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
pDnode
->
numOfVnodes
++
;
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d is added"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
maxPos
,
pVgid
->
dnodeId
);
mInfo
(
"db:%s, vgId:%d, vn
ode_index
:%d dnode:%d is added"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
maxPos
,
pVgid
->
dnodeId
);
maxPos
++
;
if
(
maxPos
==
3
)
return
0
;
}
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
86f8bf6c
...
...
@@ -168,6 +168,7 @@ typedef struct SSdb {
char
*
currDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
lastCommitTerm
;
int64_t
curVer
;
int64_t
curTerm
;
int64_t
tableVer
[
SDB_MAX
];
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
86f8bf6c
...
...
@@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
pSdb
->
curVer
=
-
1
;
pSdb
->
curTerm
=
-
1
;
pSdb
->
lastCommitVer
=
-
1
;
pSdb
->
lastCommitTerm
=
-
1
;
pSdb
->
pMnode
=
pOption
->
pMnode
;
taosThreadMutexInit
(
&
pSdb
->
filelock
,
NULL
);
mDebug
(
"sdb init successfully"
);
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
86f8bf6c
...
...
@@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) {
pSdb
->
curVer
=
-
1
;
pSdb
->
curTerm
=
-
1
;
pSdb
->
lastCommitVer
=
-
1
;
pSdb
->
lastCommitTerm
=
-
1
;
mDebug
(
"sdb reset successfully"
);
}
...
...
@@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
char
file
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%ssdb.data"
,
pSdb
->
currDir
,
TD_DIRSEP
);
mDebug
(
"start to read file:%s"
,
file
);
mDebug
(
"start to read
sdb
file:%s"
,
file
);
SSdbRaw
*
pRaw
=
taosMemoryMalloc
(
WAL_MAX_SIZE
+
100
);
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"failed read file since %s"
,
terrstr
());
mError
(
"failed read
sdb
file since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if
(
pFile
==
NULL
)
{
taosMemoryFree
(
pRaw
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
terrstr
());
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
terrstr
());
return
0
;
}
if
(
sdbReadFileHead
(
pSdb
,
pFile
)
!=
0
)
{
mError
(
"failed to read file:%s head since %s"
,
file
,
terrstr
());
mError
(
"failed to read
sdb
file:%s head since %s"
,
file
,
terrstr
());
taosMemoryFree
(
pRaw
);
taosCloseFile
(
&
pFile
);
return
-
1
;
...
...
@@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
if
(
ret
!=
readLen
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
...
...
@@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
ret
=
taosReadFile
(
pFile
,
pRaw
->
pData
,
readLen
);
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
if
(
ret
!=
readLen
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
int32_t
totalLen
=
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
+
sizeof
(
int32_t
);
if
((
!
taosCheckChecksumWhole
((
const
uint8_t
*
)
pRaw
,
totalLen
))
!=
0
)
{
code
=
TSDB_CODE_CHECKSUM_ERROR
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
}
code
=
sdbWriteWithoutFree
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"failed to read file:%s since %s"
,
file
,
terrstr
());
mError
(
"failed to read
sdb
file:%s since %s"
,
file
,
terrstr
());
goto
_OVER
;
}
}
code
=
0
;
pSdb
->
lastCommitVer
=
pSdb
->
curVer
;
pSdb
->
lastCommitTerm
=
pSdb
->
curTerm
;
memcpy
(
pSdb
->
tableVer
,
tableVer
,
sizeof
(
tableVer
));
mDebug
(
"read file:%s successfully, ver:%"
PRId64
,
file
,
pSdb
->
lastCommitVer
);
mDebug
(
"read sdb file:%s successfully, ver:%"
PRId64
" term:%"
PRId64
,
file
,
pSdb
->
lastCommitVer
,
pSdb
->
lastCommitTerm
);
_OVER:
taosCloseFile
(
&
pFile
);
...
...
@@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
sdbResetData
(
pSdb
);
int32_t
code
=
sdbReadFileImp
(
pSdb
);
if
(
code
!=
0
)
{
mError
(
"failed to read sdb since %s"
,
terrstr
());
mError
(
"failed to read sdb
file
since %s"
,
terrstr
());
sdbResetData
(
pSdb
);
}
...
...
@@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char
curfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
curfile
,
sizeof
(
curfile
),
"%s%ssdb.data"
,
pSdb
->
currDir
,
TD_DIRSEP
);
mDebug
(
"start to write file:%s, current ver:%"
PRId64
" term:%"
PRId64
", commit ver:%"
PRId64
,
curfile
,
pSdb
->
curVer
,
pSdb
->
curTerm
,
pSdb
->
lastCommitVer
);
mDebug
(
"start to write sdb file, current ver:%"
PRId64
" term:%"
PRId64
", commit ver:%"
PRId64
" term:%"
PRId64
" file:%s"
,
pSdb
->
curVer
,
pSdb
->
curTerm
,
pSdb
->
lastCommitVer
,
pSdb
->
lastCommitTerm
,
curfile
);
TdFilePtr
pFile
=
taosOpenFile
(
tmpfile
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s for write since %s"
,
tmpfile
,
terrstr
());
mError
(
"failed to open
sdb
file:%s for write since %s"
,
tmpfile
,
terrstr
());
return
-
1
;
}
if
(
sdbWriteFileHead
(
pSdb
,
pFile
)
!=
0
)
{
mError
(
"failed to write file:%s head since %s"
,
tmpfile
,
terrstr
());
mError
(
"failed to write
sdb
file:%s head since %s"
,
tmpfile
,
terrstr
());
taosCloseFile
(
&
pFile
);
return
-
1
;
}
...
...
@@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
SdbEncodeFp
encodeFp
=
pSdb
->
encodeFps
[
i
];
if
(
encodeFp
==
NULL
)
continue
;
mTrace
(
"write %s to file, total %d rows"
,
sdbTableName
(
i
),
sdbGetSize
(
pSdb
,
i
));
mTrace
(
"write %s to
sdb
file, total %d rows"
,
sdbTableName
(
i
),
sdbGetSize
(
pSdb
,
i
));
SHashObj
*
hash
=
pSdb
->
hashObjs
[
i
];
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
i
];
...
...
@@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code
=
taosFsyncFile
(
pFile
);
if
(
code
!=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to sync file:%s since %s"
,
tmpfile
,
tstrerror
(
code
));
mError
(
"failed to sync
sdb
file:%s since %s"
,
tmpfile
,
tstrerror
(
code
));
}
}
...
...
@@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code
=
taosRenameFile
(
tmpfile
,
curfile
);
if
(
code
!=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to write file:%s since %s"
,
curfile
,
tstrerror
(
code
));
mError
(
"failed to write
sdb
file:%s since %s"
,
curfile
,
tstrerror
(
code
));
}
}
if
(
code
!=
0
)
{
mError
(
"failed to write file:%s since %s"
,
curfile
,
tstrerror
(
code
));
mError
(
"failed to write
sdb
file:%s since %s"
,
curfile
,
tstrerror
(
code
));
}
else
{
pSdb
->
lastCommitVer
=
pSdb
->
curVer
;
mDebug
(
"write file:%s successfully, ver:%"
PRId64
" term:%"
PRId64
,
curfile
,
pSdb
->
lastCommitVer
,
pSdb
->
curTerm
);
pSdb
->
lastCommitTerm
=
pSdb
->
curTerm
;
mDebug
(
"write sdb file successfully, ver:%"
PRId64
" term:%"
PRId64
" file:%s"
,
pSdb
->
lastCommitVer
,
pSdb
->
lastCommitTerm
,
curfile
);
}
terrno
=
code
;
...
...
@@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
taosThreadMutexLock
(
&
pSdb
->
filelock
);
int32_t
code
=
sdbWriteFileImp
(
pSdb
);
if
(
code
!=
0
)
{
mError
(
"failed to write sdb since %s"
,
terrstr
());
mError
(
"failed to write sdb
file
since %s"
,
terrstr
());
}
taosThreadMutexUnlock
(
&
pSdb
->
filelock
);
return
code
;
...
...
@@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
if
(
taosCopyFile
(
datafile
,
pIter
->
name
)
<
0
)
{
taosThreadMutexUnlock
(
&
pSdb
->
filelock
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to copy file %s to %s since %s"
,
datafile
,
pIter
->
name
,
terrstr
());
mError
(
"failed to copy
sdb
file %s to %s since %s"
,
datafile
,
pIter
->
name
,
terrstr
());
sdbCloseIter
(
pIter
);
return
-
1
;
}
...
...
@@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
pIter
->
file
=
taosOpenFile
(
pIter
->
name
,
TD_FILE_READ
);
if
(
pIter
->
file
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to open file:%s since %s"
,
pIter
->
name
,
terrstr
());
mError
(
"failed to open
sdb
file:%s since %s"
,
pIter
->
name
,
terrstr
());
sdbCloseIter
(
pIter
);
return
-
1
;
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
86f8bf6c
...
...
@@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if
(
taskHandle
)
{
code
=
qExecTask
(
taskHandle
,
&
pRes
,
&
useconds
);
if
(
code
)
{
QW_TASK_ELOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
if
(
code
!=
TSDB_CODE_OPS_NOT_SUPPORT
)
{
QW_TASK_ELOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
}
else
{
QW_TASK_DLOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
}
QW_ERR_RET
(
code
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录