Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d92a150c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d92a150c
编写于
11月 11, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
interface for transaction
上级
ff4ad0f4
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
413 addition
and
302 deletion
+413
-302
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+16
-16
include/dnode/mnode/transaction/trn.h
include/dnode/mnode/transaction/trn.h
+13
-11
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/mnode/impl/inc/mnodeSync.h
source/dnode/mnode/impl/inc/mnodeSync.h
+1
-0
source/dnode/mnode/impl/src/mnodeAcct.c
source/dnode/mnode/impl/src/mnodeAcct.c
+10
-12
source/dnode/mnode/impl/src/mnodeSync.c
source/dnode/mnode/impl/src/mnodeSync.c
+6
-0
source/dnode/mnode/impl/src/mnodeUser.c
source/dnode/mnode/impl/src/mnodeUser.c
+61
-64
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+7
-7
source/dnode/mnode/transaction/CMakeLists.txt
source/dnode/mnode/transaction/CMakeLists.txt
+1
-0
source/dnode/mnode/transaction/inc/trnInt.h
source/dnode/mnode/transaction/inc/trnInt.h
+26
-5
source/dnode/mnode/transaction/src/trnInt.c
source/dnode/mnode/transaction/src/trnInt.c
+164
-0
source/dnode/mnode/transaction/src/trnMain.c
source/dnode/mnode/transaction/src/trnMain.c
+105
-141
source/dnode/mnode/transaction/src/trnThread.c
source/dnode/mnode/transaction/src/trnThread.c
+0
-46
source/util/src/terror.c
source/util/src/terror.c
+2
-0
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
d92a150c
...
...
@@ -76,18 +76,18 @@ extern "C" {
typedef
enum
{
SDB_START
=
0
,
SDB_
VERSION
=
1
,
SDB_
CLUSTER
=
2
,
SDB_
DNODE
=
3
,
SDB_
M
NODE
=
4
,
SDB_
ACCT
=
5
,
SDB_A
UTH
=
6
,
SDB_
USER
=
7
,
SDB_
DB
=
8
,
SDB_
VGROUP
=
9
,
SDB_
STABLE
=
10
,
SDB_
FUNC
=
11
,
SDB_
TRANS
=
12
,
SDB_
TRANS
=
1
,
SDB_
VERSION
=
2
,
SDB_
CLUSTER
=
3
,
SDB_
D
NODE
=
4
,
SDB_
MNODE
=
5
,
SDB_A
CCT
=
6
,
SDB_
AUTH
=
7
,
SDB_
USER
=
8
,
SDB_
DB
=
9
,
SDB_
VGROUP
=
10
,
SDB_
STABLE
=
11
,
SDB_
FUNC
=
12
,
SDB_MAX
=
13
}
ESdbType
;
...
...
@@ -104,14 +104,14 @@ typedef struct {
int32_t
cksum
;
int32_t
dataLen
;
char
data
[];
}
SSdbRaw
Data
;
}
SSdbRaw
;
typedef
int32_t
(
*
SdbInsertFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
void
*
pObj
);
typedef
int32_t
(
*
SdbDeployFp
)();
typedef
void
*
(
*
SdbDecodeFp
)(
SSdbRaw
Data
*
pRaw
);
typedef
SSdbRaw
Data
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
void
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
struct
{
ESdbType
sdbType
;
...
...
@@ -129,7 +129,7 @@ void sdbCleanup();
void
sdbSetHandler
(
SSdbDesc
desc
);
int32_t
sdbRead
();
int32_t
sdbWrite
(
SSdbRaw
Data
*
pRawData
);
int32_t
sdbWrite
(
SSdbRaw
*
pRaw
);
int32_t
sdbCommit
();
int32_t
sdbDeploy
();
...
...
include/dnode/mnode/transaction/trn.h
浏览文件 @
d92a150c
...
...
@@ -24,21 +24,23 @@ extern "C" {
#endif
typedef
struct
STrans
STrans
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
1
,
TRN_POLICY_RETRY
=
2
}
ETrnPolicy
;
int32_t
trnInit
();
void
trnCleanup
();
STrans
*
trnCreate
();
int32_t
trnPrepare
(
STrans
*
);
int32_t
trnCommit
(
STrans
*
);
int32_t
trnExecute
(
STrans
*
);
void
trnDrop
(
STrans
*
);
int32_t
trnAppendRedoLog
(
STrans
*
,
SSdbRawData
*
);
int32_t
trnAppendUndoLog
(
STrans
*
,
SSdbRawData
*
);
int32_t
trnAppendCommitLog
(
STrans
*
,
SSdbRawData
*
);
int32_t
trnAppendRedoAction
(
STrans
*
,
SEpSet
*
,
void
*
pMsg
);
int32_t
trnAppendUndoAction
(
STrans
*
,
SEpSet
*
,
void
*
pMsg
);
STrans
*
trnCreate
(
ETrnPolicy
);
void
trnDrop
(
STrans
*
pTrans
);
void
trnSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
int32_t
trnAppendRedoLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
trnAppendUndoLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
trnAppendCommitLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
trnAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
trnAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
trnPrepare
(
STrans
*
pTrans
,
int32_t
(
*
syncfp
)(
SSdbRaw
*
pRaw
,
void
*
pData
));
int32_t
trnApply
(
SSdbRaw
*
pRaw
,
void
*
pData
,
int32_t
code
);
int32_t
trnExecute
(
int32_t
tranId
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
d92a150c
...
...
@@ -27,6 +27,7 @@ extern "C" {
#define TAOS_FAILED(err) ((err) < 0)
const
char
*
tstrerror
(
int32_t
err
);
const
char
*
terrstr
();
int32_t
*
taosGetErrno
();
#define terrno (*taosGetErrno())
...
...
source/dnode/mnode/impl/inc/mnodeSync.h
浏览文件 @
d92a150c
...
...
@@ -24,6 +24,7 @@ extern "C" {
int32_t
mnodeInitSync
();
void
mnodeCleanUpSync
();
int32_t
mnodeSyncPropose
(
SSdbRaw
*
pRaw
,
void
*
pData
);
bool
mnodeIsMaster
();
...
...
source/dnode/mnode/impl/src/mnodeAcct.c
浏览文件 @
d92a150c
...
...
@@ -18,8 +18,8 @@
#define ACCT_VER 1
static
SSdbRaw
Data
*
mnodeAcctActionEncode
(
SAcctObj
*
pAcct
)
{
SSdbRaw
Data
*
pRaw
=
calloc
(
1
,
sizeof
(
SAcctObj
)
+
sizeof
(
SSdbRawData
));
static
SSdbRaw
*
mnodeAcctActionEncode
(
SAcctObj
*
pAcct
)
{
SSdbRaw
*
pRaw
=
calloc
(
1
,
sizeof
(
SAcctObj
)
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -45,7 +45,7 @@ static SSdbRawData *mnodeAcctActionEncode(SAcctObj *pAcct) {
return
pRaw
;
}
static
SAcctObj
*
mnodeAcctActionDecode
(
SSdbRaw
Data
*
pRaw
)
{
static
SAcctObj
*
mnodeAcctActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
ACCT_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
return
NULL
;
...
...
@@ -96,23 +96,21 @@ static int32_t mnodeCreateDefaultAcct() {
SAcctObj
acctObj
=
{
0
};
tstrncpy
(
acctObj
.
acct
,
TSDB_DEFAULT_USER
,
TSDB_USER_LEN
);
acctObj
.
createdTime
=
taosGetTimestampMs
();
acctObj
.
updateTime
=
taosGetTimestampMs
()
;
acctObj
.
updateTime
=
acctObj
.
createdTime
;
acctObj
.
acctId
=
1
;
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
1
28
,
.
maxDbs
=
1
28
,
acctObj
.
cfg
=
(
SAcctCfg
){.
maxUsers
=
1
024
,
.
maxDbs
=
1
024
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxStreams
=
1000
,
.
maxStreams
=
8092
,
.
maxStorage
=
INT64_MAX
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
SSdbRaw
Data
*
pRaw
=
mnodeAcctActionEncode
(
&
acctObj
);
SSdbRaw
*
pRaw
=
mnodeAcctActionEncode
(
&
acctObj
);
if
(
pRaw
!=
NULL
)
{
code
=
sdbWrite
(
pRaw
);
}
else
{
code
=
terrno
;
return
-
1
;
}
return
code
;
return
sdbWrite
(
pRaw
)
;
}
int32_t
mnodeInitAcct
()
{
...
...
source/dnode/mnode/impl/src/mnodeSync.c
浏览文件 @
d92a150c
...
...
@@ -20,4 +20,10 @@
int32_t
mnodeInitSync
()
{
return
0
;
}
void
mnodeCleanUpSync
()
{}
int32_t
mnodeSyncPropose
(
SSdbRaw
*
pRaw
,
void
*
pData
)
{
trnApply
(
pRaw
,
pData
,
0
);
free
(
pRaw
);
return
0
;
}
bool
mnodeIsMaster
()
{
return
true
;
}
\ No newline at end of file
source/dnode/mnode/impl/src/mnodeUser.c
浏览文件 @
d92a150c
...
...
@@ -14,15 +14,15 @@
*/
#define _DEFAULT_SOURCE
#include "mnodeSync.h"
#include "os.h"
#include "tkey.h"
#include "tglobal.h"
#include "
mnodeInt
.h"
#include "
tkey
.h"
#define USER_VER 1
static
SSdbRaw
Data
*
mnodeUserActionEncode
(
SUserObj
*
pUser
)
{
SSdbRaw
Data
*
pRaw
=
calloc
(
1
,
sizeof
(
SUserObj
)
+
sizeof
(
SSdbRawData
));
static
SSdbRaw
*
mnodeUserActionEncode
(
SUserObj
*
pUser
)
{
SSdbRaw
*
pRaw
=
calloc
(
1
,
sizeof
(
SUserObj
)
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -43,7 +43,7 @@ static SSdbRawData *mnodeUserActionEncode(SUserObj *pUser) {
return
pRaw
;
}
static
SUserObj
*
mnodeUserActionDecode
(
SSdbRaw
Data
*
pRaw
)
{
static
SUserObj
*
mnodeUserActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
USER_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
return
NULL
;
...
...
@@ -77,12 +77,14 @@ static SUserObj *mnodeUserActionDecode(SSdbRawData *pRaw) {
static
int32_t
mnodeUserActionInsert
(
SUserObj
*
pUser
)
{
pUser
->
prohibitDbHash
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
pUser
->
prohibitDbHash
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
-
1
;
}
pUser
->
pAcct
=
sdbAcquire
(
SDB_ACCT
,
pUser
->
acct
);
if
(
pUser
->
pAcct
==
NULL
)
{
return
TSDB_CODE_MND_ACCT_NOT_EXIST
;
terrno
=
TSDB_CODE_MND_ACCT_NOT_EXIST
;
return
-
1
;
}
return
0
;
...
...
@@ -108,8 +110,6 @@ static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) {
}
static
int32_t
mnodeCreateDefaultUser
(
char
*
acct
,
char
*
user
,
char
*
pass
)
{
int32_t
code
=
0
;
SUserObj
userObj
=
{
0
};
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
...
...
@@ -121,35 +121,31 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
userObj
.
rootAuth
=
1
;
}
SSdbRawData
*
pRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRaw
!=
NULL
)
{
code
=
sdbWrite
(
pRaw
);
}
else
{
code
=
terrno
;
SSdbRaw
*
pRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRaw
==
NULL
)
{
return
-
1
;
}
return
code
;
return
sdbWrite
(
pRaw
)
;
}
static
int32_t
mnodeCreateDefaultUsers
()
{
int32_t
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
);
if
(
code
!=
0
)
return
code
;
if
(
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
)
!=
0
)
{
return
-
1
;
}
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"monitor"
,
tsInternalPass
);
if
(
code
!=
0
)
return
code
;
if
(
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"monitor"
,
tsInternalPass
)
!=
0
)
{
return
-
1
;
}
code
=
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
tsInternalPass
);
if
(
code
!=
0
)
return
code
;
if
(
mnodeCreateDefaultUser
(
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
tsInternalPass
)
!=
0
)
{
return
-
1
;
}
return
code
;
return
0
;
}
static
int32_t
mnodeCreateUser
(
char
*
acct
,
char
*
user
,
char
*
pass
,
SMnMsg
*
pMsg
)
{
int32_t
code
=
0
;
STrans
*
pTrans
=
NULL
;
SSdbRawData
*
pUndoRaw
=
NULL
;
SSdbRawData
*
pRedoRaw
=
NULL
;
SUserObj
userObj
=
{
0
};
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
...
...
@@ -158,79 +154,80 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg)
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
rootAuth
=
0
;
pRedoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRedoRaw
==
NULL
)
{
code
=
terrno
;
goto
CREATE_USER_OVER
;
STrans
*
pTrans
=
trnCreate
(
TRN_POLICY_ROLLBACK
);
if
(
pTrans
==
NULL
)
return
-
1
;
SSdbRaw
*
pRedoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pRedoRaw
==
NULL
||
trnAppendRedoLog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
trnDrop
(
pTrans
);
return
-
1
;
}
pRedoRaw
->
status
=
SDB_STATUS_
READY
;
pRedoRaw
->
status
=
SDB_STATUS_
CREATING
;
pRedoRaw
->
action
=
SDB_ACTION_INSERT
;
pUndoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pUndoRaw
==
NULL
)
{
code
=
terrno
;
goto
CREATE_USER_OVER
;
SSdbRaw
*
pUndoRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
pUndoRaw
==
NULL
||
trnAppendUndoLog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
trnDrop
(
pTrans
)
;
return
-
1
;
}
pUndoRaw
->
status
=
SDB_STATUS_DROPPING
;
pUndoRaw
->
action
=
SDB_ACTION_DELETE
;
pTrans
=
trnCreate
(
);
if
(
p
Trans
==
NULL
)
{
code
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
goto
CREATE_USER_OVER
;
SSdbRaw
*
pCommitRaw
=
mnodeUserActionEncode
(
&
userObj
);
if
(
p
CommitRaw
==
NULL
||
trnAppendCommitLog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
trnDrop
(
pTrans
)
;
return
-
1
;
}
trnAppendRedoLog
(
pTrans
,
pRedoRaw
)
;
trnAppendUndoLog
(
pTrans
,
pUndoRaw
)
;
pCommitRaw
->
status
=
SDB_STATUS_READY
;
pCommitRaw
->
action
=
SDB_ACTION_UPDATE
;
code
=
trnCommit
(
pTrans
);
trnSetRpcHandle
(
pTrans
,
pMsg
->
rpcMsg
.
handle
);
CREATE_USER_OVER:
if
(
code
!=
0
)
{
if
(
trnPrepare
(
pTrans
,
mnodeSyncPropose
)
!=
0
)
{
trnDrop
(
pTrans
);
free
(
pRedoRaw
);
free
(
pUndoRaw
);
return
-
1
;
}
return
code
;
trnDrop
(
pTrans
);
return
0
;
}
static
int32_t
mnodeProcessCreateUserMsg
(
SMnMsg
*
pMsg
)
{
SCreateUserMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pCreate
->
user
[
0
]
==
0
)
{
code
=
TSDB_CODE_MND_INVALID_USER_FORMAT
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
strerror
(
code
));
return
code
;
terrno
=
TSDB_CODE_MND_INVALID_USER_FORMAT
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
errstr
(
));
return
-
1
;
}
if
(
pCreate
->
pass
[
0
]
==
0
)
{
code
=
TSDB_CODE_MND_INVALID_PASS_FORMAT
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
strerror
(
code
));
return
code
;
terrno
=
TSDB_CODE_MND_INVALID_PASS_FORMAT
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
errstr
(
));
return
-
1
;
}
SUserObj
*
pUser
=
sdbAcquire
(
SDB_USER
,
pCreate
->
user
);
if
(
pUser
!=
NULL
)
{
sdbRelease
(
pUser
);
code
=
TSDB_CODE_MND_USER_ALREADY_EXIST
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
strerror
(
code
));
return
code
;
terrno
=
TSDB_CODE_MND_USER_ALREADY_EXIST
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
errstr
(
));
return
-
1
;
}
SUserObj
*
pOperUser
=
sdbAcquire
(
SDB_USER
,
pMsg
->
user
);
if
(
pOperUser
==
NULL
)
{
code
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
strerror
(
code
));
return
code
;
terrno
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
errstr
(
));
return
-
1
;
}
code
=
mnodeCreateUser
(
pOperUser
->
acct
,
pCreate
->
user
,
pCreate
->
pass
,
pMsg
);
int32_t
code
=
mnodeCreateUser
(
pOperUser
->
acct
,
pCreate
->
user
,
pCreate
->
pass
,
pMsg
);
sdbRelease
(
pOperUser
);
if
(
code
!=
0
)
{
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
strerror
(
code
));
return
code
;
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
t
errstr
(
));
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
d92a150c
...
...
@@ -61,7 +61,7 @@ static SHashObj *sdbGetHash(int32_t sdb) {
return
hash
;
}
int32_t
sdbWrite
(
SSdbRaw
Data
*
pRaw
)
{
int32_t
sdbWrite
(
SSdbRaw
*
pRaw
)
{
SHashObj
*
hash
=
sdbGetHash
(
pRaw
->
type
);
switch
(
pRaw
->
action
)
{
case
SDB_ACTION_INSERT
:
...
...
@@ -85,7 +85,7 @@ static int32_t sdbReadVersion(FileFd fd) { return 0; }
static
int32_t
sdbReadDataFile
()
{
int32_t
code
=
0
;
SSdbRaw
Data
*
pRaw
=
malloc
(
SDB_MAX_SIZE
);
SSdbRaw
*
pRaw
=
malloc
(
SDB_MAX_SIZE
);
if
(
pRaw
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
...
...
@@ -101,7 +101,7 @@ static int32_t sdbReadDataFile() {
int64_t
offset
=
0
;
while
(
1
)
{
int32_t
ret
=
(
int32_t
)
taosReadFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
Data
));
int32_t
ret
=
(
int32_t
)
taosReadFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
));
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
...
...
@@ -110,7 +110,7 @@ static int32_t sdbReadDataFile() {
break
;
}
if
(
ret
<
sizeof
(
SSdbRaw
Data
))
{
if
(
ret
<
sizeof
(
SSdbRaw
))
{
code
=
TSDB_CODE_SDB_INTERNAL_ERROR
;
mError
(
"failed to read file:%s since %s"
,
file
,
tstrerror
(
code
));
break
;
...
...
@@ -143,7 +143,7 @@ static int32_t sdbWriteDataFile() {
return
code
;
}
for
(
int32_t
i
=
SDB_
START
;
i
<
SDB_MAX
;
++
i
)
{
for
(
int32_t
i
=
SDB_
MAX
-
1
;
i
>
SDB_START
;
--
i
)
{
SHashObj
*
hash
=
tsSdb
.
hashObjs
[
i
];
if
(
!
hash
)
continue
;
...
...
@@ -153,9 +153,9 @@ static int32_t sdbWriteDataFile() {
SSdbRow
*
pRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
pRow
!=
NULL
)
{
if
(
pRow
->
status
==
SDB_STATUS_READY
)
continue
;
SSdbRaw
Data
*
pRaw
=
(
*
encodeFp
)(
pRow
->
data
);
SSdbRaw
*
pRaw
=
(
*
encodeFp
)(
pRow
->
data
);
if
(
pRaw
!=
NULL
)
{
taosWriteFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
Data
)
+
pRaw
->
dataLen
);
taosWriteFile
(
fd
,
pRaw
,
sizeof
(
SSdbRaw
)
+
pRaw
->
dataLen
);
}
else
{
taosHashCancelIterate
(
hash
,
pRow
);
code
=
TSDB_CODE_SDB_INTERNAL_ERROR
;
...
...
source/dnode/mnode/transaction/CMakeLists.txt
浏览文件 @
d92a150c
...
...
@@ -11,4 +11,5 @@ target_link_libraries(
PRIVATE common
PRIVATE util
PRIVATE sdb
PRIVATE transport
)
source/dnode/mnode/transaction/inc/trnInt.h
浏览文件 @
d92a150c
...
...
@@ -33,14 +33,35 @@ extern "C" {
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define TRN_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
typedef
enum
{
TRN_STAGE_PREPARE
=
1
,
TRN_STAGE_EXECUTE
=
2
,
TRN_STAGE_COMMIT
=
3
,
TRN_STAGE_ROLLBACK
=
4
,
TRN_STAGE_RETRY
=
5
}
ETrnStage
;
typedef
struct
STrans
{
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
commitLogs
;
SArray
*
redoActions
;
SArray
*
undoActions
;
int32_t
id
;
ETrnStage
stage
;
ETrnPolicy
policy
;
void
*
rpcHandle
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
commitLogs
;
SArray
*
redoActions
;
SArray
*
undoActions
;
}
STrans
;
SSdbRaw
*
trnActionEncode
(
STrans
*
pTrans
);
STrans
*
trnActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
trnActionInsert
(
STrans
*
pTrans
);
int32_t
trnActionDelete
(
STrans
*
pTrans
);
int32_t
trnActionUpdate
(
STrans
*
pSrcTrans
,
STrans
*
pDstTrans
);
int32_t
trnGenerateTransId
();
#ifdef __cplusplus
}
...
...
source/dnode/mnode/transaction/src/trnInt.c
0 → 100644
浏览文件 @
d92a150c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "trnInt.h"
SSdbRaw
*
trnActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
5
*
sizeof
(
int32_t
);
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
int32_t
redoActionNum
=
taosArrayGetSize
(
pTrans
->
redoActions
);
int32_t
undoActionNum
=
taosArrayGetSize
(
pTrans
->
undoActions
);
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SSdbRaw
*
pRawData
=
taosArrayGet
(
pTrans
->
redoLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
}
for
(
int32_t
index
=
0
;
index
<
undoLogNum
;
++
index
)
{
SSdbRaw
*
pRawData
=
taosArrayGet
(
pTrans
->
undoLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
}
for
(
int32_t
index
=
0
;
index
<
commitLogNum
;
++
index
)
{
SSdbRaw
*
pRawData
=
taosArrayGet
(
pTrans
->
commitLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRaw
)
+
pRawData
->
dataLen
);
}
SSdbRaw
*
pRaw
=
calloc
(
1
,
rawDataLen
+
sizeof
(
SSdbRaw
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_TRANS
;
pRaw
->
sver
=
TRN_VER
;
return
pRaw
;
}
STrans
*
trnActionDecode
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
->
sver
!=
TRN_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
return
NULL
;
}
STrans
*
pTrans
=
NULL
;
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
redoLogNum
=
0
;
int32_t
undoLogNum
=
0
;
int32_t
commitLogNum
=
0
;
int32_t
redoActionNum
=
0
;
int32_t
undoActionNum
=
0
;
SSdbRaw
*
pTmp
=
malloc
(
sizeof
(
SSdbRaw
));
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
,
code
)
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pTmp
,
sizeof
(
SSdbRaw
),
code
);
if
(
code
==
0
&&
pTmp
->
dataLen
>
0
)
{
SSdbRaw
*
pRead
=
malloc
(
sizeof
(
SSdbRaw
)
+
pTmp
->
dataLen
);
if
(
pRead
==
NULL
)
{
code
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
break
;
}
memcpy
(
pRead
,
pTmp
,
sizeof
(
SSdbRaw
));
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pRead
->
data
,
pRead
->
dataLen
,
code
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pRead
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
break
;
}
}
}
if
(
code
!=
0
)
{
trnDrop
(
pTrans
);
terrno
=
code
;
return
NULL
;
}
return
pTrans
;
}
int32_t
trnActionInsert
(
STrans
*
pTrans
)
{
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
index
=
0
;
index
<
arraySize
;
++
index
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
int32_t
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
return
code
;
}
}
return
0
;
}
int32_t
trnActionDelete
(
STrans
*
pTrans
)
{
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
for
(
int32_t
index
=
0
;
index
<
arraySize
;
++
index
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
int32_t
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
return
code
;
}
}
return
0
;
}
int32_t
trnActionUpdate
(
STrans
*
pSrcTrans
,
STrans
*
pDstTrans
)
{
return
0
;
}
int32_t
trnGenerateTransId
()
{
return
1
;
}
int32_t
trnInit
()
{
SSdbDesc
desc
=
{.
sdbType
=
SDB_TRANS
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
trnActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
trnActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
trnActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
trnActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
trnActionDelete
};
sdbSetHandler
(
desc
);
return
0
;
}
void
trnCleanup
()
{}
source/dnode/mnode/transaction/src/trnMain.c
浏览文件 @
d92a150c
...
...
@@ -15,17 +15,18 @@
#define _DEFAULT_SOURCE
#include "trnInt.h"
#include "trpc.h"
#define TRN_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
STrans
*
trnCreate
()
{
STrans
*
trnCreate
(
ETrnPolicy
policy
)
{
STrans
*
pTrans
=
calloc
(
1
,
sizeof
(
STrans
));
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
pTrans
->
id
=
trnGenerateTransId
();
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
redoLogs
=
taosArrayInit
(
TRN_DEFAULT_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TRN_DEFAULT_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TRN_DEFAULT_ARRAY_SIZE
,
sizeof
(
void
*
));
...
...
@@ -34,7 +35,6 @@ STrans *trnCreate() {
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
trnDrop
(
pTrans
);
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -42,192 +42,156 @@ STrans *trnCreate() {
return
pTrans
;
}
int32_t
trnCommit
(
STrans
*
pTrans
)
{
return
0
;
}
static
void
trnDropLogs
(
SArray
*
pArray
)
{
static
void
trnDropArray
(
SArray
*
pArray
)
{
for
(
int32_t
index
=
0
;
index
<
pArray
->
size
;
++
index
)
{
SSdbRaw
Data
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
free
(
pRaw
);
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
t
free
(
pRaw
);
}
taosArrayDestroy
(
pArray
);
}
void
trnDrop
(
STrans
*
pTrans
)
{
trnDropLogs
(
pTrans
->
redoLogs
);
trnDropLogs
(
pTrans
->
undoLogs
);
trnDropLogs
(
pTrans
->
commitLogs
);
free
(
pTrans
);
trnDropArray
(
pTrans
->
redoLogs
);
trnDropArray
(
pTrans
->
undoLogs
);
trnDropArray
(
pTrans
->
commitLogs
);
trnDropArray
(
pTrans
->
redoActions
);
trnDropArray
(
pTrans
->
undoActions
);
tfree
(
pTrans
);
}
int32_t
trnAppendRedoLog
(
STrans
*
pTrans
,
SSdbRawData
*
pRaw
)
{
void
*
ptr
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pRaw
);
if
(
ptr
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
void
trnSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
if
(
pTrans
!=
NULL
)
{
pTrans
->
rpcHandle
=
rpcHandle
;
}
return
0
;
}
int32_t
trnAppendUndoLog
(
STrans
*
pTrans
,
SSdbRawData
*
pRaw
)
{
void
*
ptr
=
taosArrayPush
(
pTrans
->
undoLogs
,
&
pRaw
);
if
(
ptr
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
static
int32_t
trnAppendArray
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
if
(
pArray
==
NULL
||
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
int32_t
trnAppendCommitLog
(
STrans
*
pTrans
,
SSdbRawData
*
pRaw
)
{
void
*
ptr
=
taosArrayPush
(
pTrans
->
commitLogs
,
&
pRaw
);
void
*
ptr
=
taosArrayPush
(
pArray
,
&
pRaw
);
if
(
ptr
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
int32_t
trnAppendRedoLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
return
trnAppendArray
(
pTrans
->
redoLogs
,
pRaw
);
}
int32_t
trnAppendUndoLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
return
trnAppendArray
(
pTrans
->
undoLogs
,
pRaw
);
}
int32_t
trnAppendCommitLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
return
trnAppendArray
(
pTrans
->
commitLogs
,
pRaw
);
}
int32_t
trnAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
void
*
ptr
=
taosArrayPush
(
pTrans
->
redoActions
,
&
pMsg
);
if
(
ptr
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
return
0
;
return
trnAppendArray
(
pTrans
->
redoActions
,
pMsg
);
}
int32_t
trnAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
void
*
ptr
=
taosArrayPush
(
pTrans
->
undoActions
,
&
pMsg
);
if
(
ptr
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
return
0
;
return
trnAppendArray
(
pTrans
->
undoActions
,
pMsg
);
}
static
SSdbRawData
*
trnActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
5
*
sizeof
(
int32_t
);
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
int32_t
redoActionNum
=
taosArrayGetSize
(
pTrans
->
redoActions
);
int32_t
undoActionNum
=
taosArrayGetSize
(
pTrans
->
undoActions
);
int32_t
trnPrepare
(
STrans
*
pTrans
,
int32_t
(
*
syncfp
)(
SSdbRaw
*
pRaw
,
void
*
pData
))
{
if
(
syncfp
==
NULL
)
return
-
1
;
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SSdbRawData
*
pRawData
=
taosArrayGet
(
pTrans
->
redoLogs
,
index
);
rawDataLen
+=
(
sizeof
(
SSdbRawData
)
+
pRawData
->
dataLen
);
SSdbRaw
*
pRaw
=
trnActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"tranId:%d, failed to decode trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
for
(
int32_t
index
=
0
;
index
<
undoLogNum
;
++
index
)
{
SSdbRawData
*
pRawData
=
taosArrayGet
(
pTrans
->
undoLogs
,
index
);
r
awDataLen
+=
(
sizeof
(
SSdbRawData
)
+
pRawData
->
dataLen
)
;
if
(
sdbWrite
(
pRaw
)
!=
0
)
{
mError
(
"tranId:%d, failed to write trans since %s"
,
pTrans
->
id
,
terrstr
()
);
r
eturn
-
1
;
}
for
(
int32_t
index
=
0
;
index
<
commitLogNum
;
++
index
)
{
SSdbRawData
*
pRawData
=
taosArrayGet
(
pTrans
->
commitLogs
,
index
);
r
awDataLen
+=
(
sizeof
(
SSdbRawData
)
+
pRawData
->
dataLen
)
;
if
((
*
syncfp
)(
pRaw
,
pTrans
->
rpcHandle
)
!=
0
)
{
mError
(
"tranId:%d, failed to sync trans since %s"
,
pTrans
->
id
,
terrstr
()
);
r
eturn
-
1
;
}
SSdbRawData
*
pRaw
=
calloc
(
1
,
rawDataLen
+
sizeof
(
SSdbRawData
));
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
dataLen
=
0
;
char
*
pData
=
pRaw
->
data
;
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
)
SDB_SET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
)
pRaw
->
dataLen
=
dataLen
;
pRaw
->
type
=
SDB_TRANS
;
pRaw
->
sver
=
TRN_VER
;
return
pRaw
;
return
0
;
}
static
STrans
*
trnActionDecode
(
SSdbRawData
*
pRaw
)
{
if
(
pRaw
->
sver
!=
TRN_VER
)
{
terrno
=
TSDB_CODE_SDB_INVAID_RAW_DATA_VER
;
r
eturn
NULL
;
static
void
trnSendRpcRsp
(
void
*
rpcHandle
,
int32_t
code
)
{
if
(
rpcHandle
!=
NULL
)
{
SRpcMsg
rspMsg
=
{.
handle
=
rpcHandle
,
.
code
=
terrno
}
;
r
pcSendResponse
(
&
rspMsg
)
;
}
}
STrans
*
pTrans
=
trnCreate
();
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
redoLogNum
=
0
;
int32_t
undoLogNum
=
0
;
int32_t
commitLogNum
=
0
;
int32_t
redoActionNum
=
0
;
int32_t
undoActionNum
=
0
;
SSdbRawData
*
pTmp
=
malloc
(
sizeof
(
SSdbRawData
));
int32_t
code
=
0
;
int32_t
dataLen
=
pRaw
->
dataLen
;
char
*
pData
=
pRaw
->
data
;
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
commitLogNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
redoActionNum
,
code
)
SDB_GET_INT32_VAL
(
pData
,
dataLen
,
undoActionNum
,
code
)
for
(
int32_t
index
=
0
;
index
<
redoLogNum
;
++
index
)
{
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pTmp
,
sizeof
(
SSdbRawData
),
code
);
if
(
code
==
0
&&
pTmp
->
dataLen
>
0
)
{
SSdbRawData
*
pRead
=
malloc
(
sizeof
(
SSdbRawData
)
+
pTmp
->
dataLen
);
if
(
pRead
==
NULL
)
{
code
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
break
;
}
memcpy
(
pRead
,
pTmp
,
sizeof
(
SSdbRawData
));
SDB_GET_BINARY_VAL
(
pData
,
dataLen
,
pRead
->
data
,
pRead
->
dataLen
,
code
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pRead
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_MND_OUT_OF_MEMORY
;
break
;
}
}
int32_t
trnApply
(
SSdbRaw
*
pRaw
,
void
*
pData
,
int32_t
code
)
{
if
(
code
!=
0
)
{
trnSendRpcRsp
(
pData
,
terrno
);
return
0
;
}
if
(
code
!=
0
)
{
trnDrop
(
pTrans
);
if
(
sdbWrite
(
pRaw
)
!=
0
)
{
code
=
terrno
;
trnSendRpcRsp
(
pData
,
code
);
terrno
=
code
;
return
NULL
;
return
-
1
;
}
return
pTrans
;
return
0
;
}
static
int32_t
trnActionInsert
(
STrans
*
pTrans
)
{
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
int32_t
trnExecuteRedoLogs
(
STrans
*
pTrans
)
{
return
0
;}
int32_t
trnExecuteUndoLogs
(
STrans
*
pTrans
)
{
return
0
;}
int32_t
trnExecuteCommitLogs
(
STrans
*
pTrans
)
{
return
0
;}
int32_t
trnExecuteRedoActions
(
STrans
*
pTrans
)
{
return
0
;}
int32_t
trnExecuteUndoActions
(
STrans
*
pTrans
)
{
return
0
;}
static
int32_t
trnPerfomRollbackStage
(
STrans
*
pTrans
)
{
return
0
;
}
for
(
int32_t
index
=
0
;
index
<
arraySize
;
++
index
)
{
SSdbRawData
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
int32_t
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
return
code
;
}
int32_t
trnExecute
(
int32_t
tranId
)
{
int32_t
code
=
0
;
STrans
*
pTrans
=
sdbAcquire
(
SDB_TRANS
,
&
tranId
);
if
(
pTrans
==
NULL
)
{
code
=
terrno
;
return
code
;
}
return
0
;
}
if
(
pTrans
->
stage
==
TRN_STAGE_PREPARE
)
{
code
=
trnExecuteRedoLogs
(
pTrans
);
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
}
}
static
int32_t
trnActionDelete
(
STrans
*
pTrans
)
{
SArray
*
pArray
=
pTrans
->
redoLogs
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
if
(
pTrans
->
stage
==
TRN_STAGE_EXECUTE
)
{
code
=
trnExecuteRedoActions
(
pTrans
);
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
// do nothing
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_RETRY
)
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
}
}
}
for
(
int32_t
index
=
0
;
index
<
arraySize
;
++
index
)
{
SSdbRawData
*
pRaw
=
taosArrayGetP
(
pArray
,
index
);
int32_t
code
=
sdbWrite
(
pRaw
);
if
(
code
!=
0
)
{
return
code
;
if
(
pTrans
->
stage
==
TRN_STAGE_COMMIT
)
{
code
=
trnExecuteCommitLogs
(
pTrans
);
if
(
code
==
0
)
{
trnDrop
(
pTrans
);
}
}
return
0
;
}
if
(
pTrans
->
stage
==
TRN_STAGE_ROLLBACK
)
{
}
if
(
pTrans
->
stage
==
TRN_STAGE_RETRY
)
{
}
static
int32_t
trnActionUpdate
(
STrans
*
pSrcUser
,
STrans
*
pDstUser
)
{
return
0
;
}
}
\ No newline at end of file
source/dnode/mnode/transaction/src/trnThread.c
已删除
100644 → 0
浏览文件 @
ff4ad0f4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "trnInt.h"
#include "tthread.h"
static
struct
{
pthread_t
*
threadId
;
bool
threadRunning
;
}
tsTrn
;
static
void
*
trnThreadFunc
(
void
*
param
)
{
while
(
1
)
{
pthread_testcancel
();
}
return
NULL
;
}
int32_t
trnInit
()
{
tsTrn
.
threadId
=
taosCreateThread
(
trnThreadFunc
,
NULL
);
if
(
tsTrn
.
threadId
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
return
0
;
}
void
trnCleanup
()
{
if
(
tsTrn
.
threadId
)
{
taosDestoryThread
(
tsTrn
.
threadId
);
tsTrn
.
threadId
=
NULL
;
}
}
source/util/src/terror.c
浏览文件 @
d92a150c
...
...
@@ -524,3 +524,5 @@ const char* tstrerror(int32_t err) {
return
""
;
}
const
char
*
terrstr
()
{
return
tstrerror
(
terrno
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录