Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
85bf677e
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
85bf677e
编写于
12月 20, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 process create vnode msg in dnode module
上级
acacf74b
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
154 addition
and
105 deletion
+154
-105
include/util/tthread.h
include/util/tthread.h
+2
-2
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+12
-6
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+5
-6
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-0
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+8
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+106
-71
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+5
-1
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-1
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+2
-2
source/util/src/tthread.c
source/util/src/tthread.c
+11
-13
未找到文件。
include/util/tthread.h
浏览文件 @
85bf677e
...
@@ -24,8 +24,8 @@ extern "C" {
...
@@ -24,8 +24,8 @@ extern "C" {
#include "tdef.h"
#include "tdef.h"
// create new thread
// create new thread
pthread_t
*
taosCreateThread
(
void
*
(
*
__start_routine
)
(
void
*
),
void
*
param
);
pthread_t
*
taosCreateThread
(
void
*
(
*
__start_routine
)(
void
*
),
void
*
param
);
// destory thread
// destory thread
bool
taosDestoryThread
(
pthread_t
*
pthread
);
bool
taosDestoryThread
(
pthread_t
*
pthread
);
// thread running return true
// thread running return true
bool
taosThreadRunning
(
pthread_t
*
pthread
);
bool
taosThreadRunning
(
pthread_t
*
pthread
);
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
85bf677e
...
@@ -48,7 +48,7 @@ typedef struct {
...
@@ -48,7 +48,7 @@ typedef struct {
int32_t
opened
;
int32_t
opened
;
int32_t
failed
;
int32_t
failed
;
int32_t
threadIndex
;
int32_t
threadIndex
;
pthread_t
*
pThreadI
d
;
pthread_t
threa
d
;
SDnode
*
pDnode
;
SDnode
*
pDnode
;
SWrapperCfg
*
pCfgs
;
SWrapperCfg
*
pCfgs
;
}
SVnodeThread
;
}
SVnodeThread
;
...
@@ -463,6 +463,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
...
@@ -463,6 +463,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
threads
[
t
].
threadIndex
=
t
;
threads
[
t
].
threadIndex
=
t
;
threads
[
t
].
pDnode
=
pDnode
;
threads
[
t
].
pCfgs
=
calloc
(
vnodesPerThread
,
sizeof
(
SWrapperCfg
));
threads
[
t
].
pCfgs
=
calloc
(
vnodesPerThread
,
sizeof
(
SWrapperCfg
));
}
}
...
@@ -478,16 +479,21 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
...
@@ -478,16 +479,21 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread
*
pThread
=
&
threads
[
t
];
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
==
0
)
continue
;
if
(
pThread
->
vnodeNum
==
0
)
continue
;
pThread
->
pThreadId
=
taosCreateThread
(
dnodeOpenVnodeFunc
,
pThread
);
pthread_attr_t
thAttr
;
if
(
pThread
->
pThreadId
==
NULL
)
{
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pThread
->
thread
,
&
thAttr
,
dnodeOpenVnodeFunc
,
pThread
)
!=
0
)
{
dError
(
"thread:%d, failed to create thread to open vnode, reason:%s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
dError
(
"thread:%d, failed to create thread to open vnode, reason:%s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
}
}
pthread_attr_destroy
(
&
thAttr
);
}
}
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
SVnodeThread
*
pThread
=
&
threads
[
t
];
taosDestoryThread
(
pThread
->
pThreadId
);
if
(
pThread
->
vnodeNum
>
0
&&
taosCheckPthreadValid
(
pThread
->
thread
))
{
pThread
->
pThreadId
=
NULL
;
pthread_join
(
pThread
->
thread
,
NULL
);
}
free
(
pThread
->
pCfgs
);
free
(
pThread
->
pCfgs
);
}
}
free
(
threads
);
free
(
threads
);
...
@@ -790,7 +796,7 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
...
@@ -790,7 +796,7 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
break
;
break
;
}
}
SRpcMsg
rsp
=
{.
code
=
code
,
.
handle
=
pMsg
->
handle
};
SRpcMsg
rsp
=
{.
code
=
code
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
};
rpcSendResponse
(
&
rsp
);
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pMsg
->
pCont
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
taosFreeQitem
(
pMsg
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
85bf677e
...
@@ -61,15 +61,14 @@ typedef enum {
...
@@ -61,15 +61,14 @@ typedef enum {
}
EAuthOp
;
}
EAuthOp
;
typedef
enum
{
typedef
enum
{
TRN_STAGE_PREPARE
=
1
,
TRN_STAGE_PREPARE
=
0
,
TRN_STAGE_EXECUTE
=
2
,
TRN_STAGE_EXECUTE
=
1
,
TRN_STAGE_ROLLBACK
=
2
,
TRN_STAGE_COMMIT
=
3
,
TRN_STAGE_COMMIT
=
3
,
TRN_STAGE_ROLLBACK
=
4
,
TRN_STAGE_OVER
=
4
,
TRN_STAGE_RETRY
=
5
,
TRN_STAGE_OVER
=
6
,
}
ETrnStage
;
}
ETrnStage
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
1
,
TRN_POLICY_RETRY
=
2
}
ETrnPolicy
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
0
,
TRN_POLICY_RETRY
=
1
}
ETrnPolicy
;
typedef
enum
{
typedef
enum
{
DND_STATUS_OFFLINE
=
0
,
DND_STATUS_OFFLINE
=
0
,
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
85bf677e
...
@@ -70,6 +70,7 @@ typedef struct SMnode {
...
@@ -70,6 +70,7 @@ typedef struct SMnode {
tmr_h
timer
;
tmr_h
timer
;
char
*
path
;
char
*
path
;
SMnodeCfg
cfg
;
SMnodeCfg
cfg
;
int64_t
checkTime
;
SSdb
*
pSdb
;
SSdb
*
pSdb
;
SDnode
*
pDnode
;
SDnode
*
pDnode
;
SArray
*
pSteps
;
SArray
*
pSteps
;
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
85bf677e
...
@@ -25,6 +25,9 @@ extern "C" {
...
@@ -25,6 +25,9 @@ extern "C" {
typedef
struct
{
typedef
struct
{
SEpSet
epSet
;
SEpSet
epSet
;
int8_t
msgType
;
int8_t
msgType
;
int8_t
msgSent
;
int8_t
msgReceived
;
int32_t
errCode
;
int32_t
contLen
;
int32_t
contLen
;
void
*
pCont
;
void
*
pCont
;
}
STransAction
;
}
STransAction
;
...
@@ -39,10 +42,13 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
...
@@ -39,10 +42,13 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
void
mndTransHandleActionRsp
(
SMnodeMsg
*
pMsg
);
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
char
*
mndTransPolicyStr
(
ETrnPolicy
policy
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
85bf677e
...
@@ -311,7 +311,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
...
@@ -311,7 +311,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action
.
pCont
=
pMsg
;
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SCreateVnodeMsg
);
action
.
contLen
=
sizeof
(
SCreateVnodeMsg
);
action
.
msgType
=
TSDB_MSG_TYPE_
ALTER
_VNODE_IN
;
action
.
msgType
=
TSDB_MSG_TYPE_
CREATE
_VNODE_IN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
free
(
pMsg
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
85bf677e
...
@@ -34,7 +34,7 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
...
@@ -34,7 +34,7 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static
void
mndTransDropLogs
(
SArray
*
pArray
);
static
void
mndTransDropLogs
(
SArray
*
pArray
);
static
void
mndTransDropActions
(
SArray
*
pArray
);
static
void
mndTransDropActions
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
S
Trans
*
pTrans
,
S
Array
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
@@ -336,10 +336,8 @@ char *mndTransStageStr(ETrnStage stage) {
...
@@ -336,10 +336,8 @@ char *mndTransStageStr(ETrnStage stage) {
return
"commit"
;
return
"commit"
;
case
TRN_STAGE_ROLLBACK
:
case
TRN_STAGE_ROLLBACK
:
return
"rollback"
;
return
"rollback"
;
case
TRN_STAGE_RETRY
:
return
"retry"
;
case
TRN_STAGE_OVER
:
case
TRN_STAGE_OVER
:
return
"
stop
"
;
return
"
over
"
;
default:
default:
return
"undefined"
;
return
"undefined"
;
}
}
...
@@ -381,7 +379,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
...
@@ -381,7 +379,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
NULL
;
return
NULL
;
}
}
mDebug
(
"trans:%d,
data:%p is created"
,
pTrans
->
id
,
pTrans
);
mDebug
(
"trans:%d,
is created"
,
pTrans
->
id
);
return
pTrans
;
return
pTrans
;
}
}
...
@@ -410,7 +408,7 @@ void mndTransDrop(STrans *pTrans) {
...
@@ -410,7 +408,7 @@ void mndTransDrop(STrans *pTrans) {
mndTransDropActions
(
pTrans
->
redoActions
);
mndTransDropActions
(
pTrans
->
redoActions
);
mndTransDropActions
(
pTrans
->
undoActions
);
mndTransDropActions
(
pTrans
->
undoActions
);
mDebug
(
"trans:%d, data:%p is dropped
"
,
pTrans
->
id
,
pTrans
);
// mDebug("trans:%d, is dropped, data:%p
", pTrans->id, pTrans);
tfree
(
pTrans
);
tfree
(
pTrans
);
}
}
...
@@ -453,7 +451,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
...
@@ -453,7 +451,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
}
}
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
)
{
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
)
{
void
*
ptr
=
taosArrayPush
(
pArray
,
&
pAction
);
void
*
ptr
=
taosArrayPush
(
pArray
,
pAction
);
if
(
ptr
==
NULL
)
{
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -480,7 +478,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
...
@@ -480,7 +478,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mTrace
(
"trans:%d, s
tart sync
"
,
pTrans
->
id
);
mTrace
(
"trans:%d, s
ync to other nodes
"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -520,7 +518,7 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
...
@@ -520,7 +518,7 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
mTrace
(
"trans:%d, s
tart sync
"
,
pTrans
->
id
);
mTrace
(
"trans:%d, s
ync to other nodes
"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -550,7 +548,7 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
...
@@ -550,7 +548,7 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
mTrace
(
"trans:%d, s
tart sync
"
,
pTrans
->
id
);
mTrace
(
"trans:%d, s
ync to other nodes
"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -583,6 +581,50 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
...
@@ -583,6 +581,50 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
// todo
// todo
}
}
void
mndTransHandleActionRsp
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
int64_t
sig
=
(
int64_t
)(
pMsg
->
rpcMsg
.
ahandle
);
int32_t
transId
=
(
int32_t
)(
sig
>>
32
);
int32_t
action
=
(
int32_t
)((
sig
<<
32
)
>>
32
);
STrans
*
pTrans
=
mndAcquireTrans
(
pMnode
,
transId
);
if
(
pTrans
==
NULL
)
{
mError
(
"trans:%d, failed to get transId from vnode rsp since %s"
,
transId
,
terrstr
());
goto
HANDLE_ACTION_RSP_OVER
;
}
SArray
*
pArray
=
NULL
;
if
(
pTrans
->
stage
==
TRN_STAGE_EXECUTE
)
{
pArray
=
pTrans
->
redoActions
;
}
else
if
(
pTrans
->
stage
==
TRN_STAGE_ROLLBACK
)
{
pArray
=
pTrans
->
undoActions
;
}
else
{
}
if
(
pArray
==
NULL
)
{
mError
(
"trans:%d, invalid trans stage:%s"
,
transId
,
mndTransStageStr
(
pTrans
->
stage
));
goto
HANDLE_ACTION_RSP_OVER
;
}
int32_t
actionNum
=
taosArrayGetSize
(
pTrans
->
redoActions
);
if
(
action
<
0
||
action
>
actionNum
)
{
mError
(
"trans:%d, invalid action:%d"
,
transId
,
action
);
goto
HANDLE_ACTION_RSP_OVER
;
}
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
!=
NULL
)
{
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
pMsg
->
code
;
}
mDebug
(
"trans:%d, action:%d response is received, code:0x%x"
,
transId
,
action
,
pMsg
->
code
);
mndTransExecute
(
pMnode
,
pTrans
);
HANDLE_ACTION_RSP_OVER:
mndReleaseTrans
(
pMnode
,
pTrans
);
}
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
...
@@ -605,7 +647,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -605,7 +647,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
m
Trace
(
"trans:%d, execute redo logs finished"
,
pTrans
->
id
)
m
Debug
(
"trans:%d, execute redo logs finished"
,
pTrans
->
id
)
}
}
}
}
...
@@ -619,7 +661,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -619,7 +661,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
m
Trace
(
"trans:%d, execute undo logs finished"
,
pTrans
->
id
)
m
Debug
(
"trans:%d, execute undo logs finished"
,
pTrans
->
id
)
}
}
}
}
...
@@ -633,47 +675,70 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -633,47 +675,70 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
m
Trace
(
"trans:%d, execute commit logs finished"
,
pTrans
->
id
)
m
Debug
(
"trans:%d, execute commit logs finished"
,
pTrans
->
id
)
}
}
}
}
return
code
;
return
code
;
}
}
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SArray
*
pArray
)
{
#if 0
int32_t
numOfActions
=
taosArrayGetSize
(
pArray
);
int32_t arraySize = taosArrayGetSize(pArray);
if
(
numOfActions
==
0
)
return
0
;
for (int32_t i = 0; i < arraySize; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgSent
)
continue
;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
int64_t
signature
=
pTrans
->
id
;
signature
=
(
signature
<<
32
);
signature
+=
action
;
SRpcMsg
rpcMsg
=
{.
msgType
=
pAction
->
msgType
,
.
contLen
=
pAction
->
contLen
,
.
ahandle
=
(
void
*
)
signature
};
rpcMsg
.
pCont
=
rpcMallocCont
(
pAction
->
contLen
);
rpcMsg
.
pCont
=
rpcMallocCont
(
pAction
->
contLen
);
if
(
rpcMsg
.
pCont
==
NULL
)
{
if
(
rpcMsg
.
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
memcpy
(
rpcMsg
.
pCont
,
pAction
->
pCont
,
pAction
->
contLen
);
memcpy
(
rpcMsg
.
pCont
,
pAction
->
pCont
,
pAction
->
contLen
);
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
mDebug
(
"trans:%d, action:%d is sent"
,
pTrans
->
id
,
action
);
mndSendMsgToDnode
(
pMnode
,
&
pAction
->
epSet
,
&
rpcMsg
);
mndSendMsgToDnode
(
pMnode
,
&
pAction
->
epSet
,
&
rpcMsg
);
}
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
int32_t
numOfReceivedMsgs
=
0
;
#else
int32_t
errorCode
=
0
;
return
0
;
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
#endif
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
)
{
numOfReceivedMsgs
++
;
if
(
pAction
->
errCode
!=
0
)
{
errorCode
=
pAction
->
errCode
;
}
}
}
if
(
numOfReceivedMsgs
==
numOfActions
)
{
mDebug
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
errorCode
);
terrno
=
errorCode
;
return
errorCode
;
}
else
{
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
}
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
<=
0
)
return
0
;
return
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
mTrace
(
"trans:%d, start to execute redo actions"
,
pTrans
->
id
);
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
redoActions
);
}
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
<=
0
)
return
0
;
return
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
mTrace
(
"trans:%d, start to execute undo actions"
,
pTrans
->
id
);
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
undoActions
);
}
}
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
@@ -681,7 +746,7 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -681,7 +746,7 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
pTrans
->
stage
=
TRN_STAGE_EXECUTE
;
m
Trace
(
"trans:%d, stage from prepare to execute"
,
pTrans
->
id
);
m
Debug
(
"trans:%d, stage from prepare to execute"
,
pTrans
->
id
);
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from prepare to rollback since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, stage from prepare to rollback since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -695,17 +760,17 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -695,17 +760,17 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
m
Trace
(
"trans:%d, stage from execute to commit"
,
pTrans
->
id
);
m
Debug
(
"trans:%d, stage from execute to commit"
,
pTrans
->
id
);
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
m
Trace
(
"trans:%d, stage keep on execute since %s"
,
pTrans
->
id
,
terrst
r
(
code
));
m
Debug
(
"trans:%d, stage keep on execute since %s"
,
pTrans
->
id
,
tstrerro
r
(
code
));
return
code
;
return
code
;
}
else
{
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from execute to rollback since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, stage from execute to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_
RETRY
;
pTrans
->
stage
=
TRN_STAGE_
EXECUTE
;
mError
(
"trans:%d, stage
from execute to retry
since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, stage
keep on execute
since %s"
,
pTrans
->
id
,
terrstr
());
}
}
}
}
...
@@ -713,29 +778,16 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -713,29 +778,16 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
}
}
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteCommitLogs
(
pMnode
,
pTrans
);
mndTransExecuteCommitLogs
(
pMnode
,
pTrans
);
pTrans
->
stage
=
TRN_STAGE_OVER
;
if
(
code
==
0
)
{
return
0
;
pTrans
->
stage
=
TRN_STAGE_OVER
;
mTrace
(
"trans:%d, commit stage finished"
,
pTrans
->
id
);
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage from commit to rollback since %s"
,
pTrans
->
id
,
terrstr
());
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage from commit to retry since %s"
,
pTrans
->
id
,
terrstr
());
}
}
return
code
;
}
}
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteUndoActions
(
pMnode
,
pTrans
);
int32_t
code
=
mndTransExecuteUndoActions
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
m
Trace
(
"trans:%d, rollbacked"
,
pTrans
->
id
);
m
Debug
(
"trans:%d, rollbacked"
,
pTrans
->
id
);
}
else
{
}
else
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
mError
(
"trans:%d, stage keep on rollback since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, stage keep on rollback since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -744,20 +796,6 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -744,20 +796,6 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return
code
;
return
code
;
}
}
static
int32_t
mndTransPerformRetryStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteRedoActions
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
mTrace
(
"trans:%d, stage from retry to commit"
,
pTrans
->
id
);
}
else
{
pTrans
->
stage
=
TRN_STAGE_RETRY
;
mError
(
"trans:%d, stage keep on retry since %s"
,
pTrans
->
id
,
terrstr
());
}
return
code
;
}
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
void
mndTransExecute
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -772,7 +810,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
...
@@ -772,7 +810,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case
TRN_STAGE_COMMIT
:
case
TRN_STAGE_COMMIT
:
code
=
mndTransCommit
(
pMnode
,
pTrans
);
code
=
mndTransCommit
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
code
=
mndTransPerformCommitStage
(
pMnode
,
pTrans
);
mndTransPerformCommitStage
(
pMnode
,
pTrans
);
}
}
break
;
break
;
case
TRN_STAGE_ROLLBACK
:
case
TRN_STAGE_ROLLBACK
:
...
@@ -781,9 +819,6 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
...
@@ -781,9 +819,6 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
code
=
mndTransRollback
(
pMnode
,
pTrans
);
code
=
mndTransRollback
(
pMnode
,
pTrans
);
}
}
break
;
break
;
case
TRN_STAGE_RETRY
:
code
=
mndTransPerformRetryStage
(
pMnode
,
pTrans
);
break
;
default:
default:
mndTransSendRpcRsp
(
pTrans
,
0
);
mndTransSendRpcRsp
(
pTrans
,
0
);
return
;
return
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
85bf677e
...
@@ -311,7 +311,11 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
...
@@ -311,7 +311,11 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
return
0
;
return
0
;
}
}
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
85bf677e
...
@@ -225,7 +225,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
...
@@ -225,7 +225,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
}
}
return
0
;
return
0
;
}
}
SMnode
*
mndOpen
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
SMnode
*
mndOpen
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
mDebug
(
"start to open mnode in %s"
,
path
);
mDebug
(
"start to open mnode in %s"
,
path
);
...
...
source/dnode/mnode/sdb/src/sdbRaw.c
浏览文件 @
85bf677e
...
@@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
...
@@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw
->
sver
=
sver
;
pRaw
->
sver
=
sver
;
pRaw
->
dataLen
=
dataLen
;
pRaw
->
dataLen
=
dataLen
;
mTrace
(
"raw:%p, is created, len:%d"
,
pRaw
,
dataLen
);
//
mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
return
pRaw
;
return
pRaw
;
}
}
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
)
{
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
)
{
mTrace
(
"raw:%p, is freed"
,
pRaw
);
//
mTrace("raw:%p, is freed", pRaw);
free
(
pRaw
);
free
(
pRaw
);
}
}
...
...
source/util/src/tthread.c
浏览文件 @
85bf677e
...
@@ -13,16 +13,16 @@
...
@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "os.h"
#include "tthread.h"
#include "tthread.h"
#include "os.h"
#include "taoserror.h"
#include "tdef.h"
#include "tdef.h"
#include "tutil.h"
#include "tutil.h"
#include "ulog.h"
#include "ulog.h"
#include "taoserror.h"
// create new thread
// create new thread
pthread_t
*
taosCreateThread
(
void
*
(
*
__start_routine
)
(
void
*
),
void
*
param
)
{
pthread_t
*
taosCreateThread
(
void
*
(
*
__start_routine
)(
void
*
),
void
*
param
)
{
pthread_t
*
pthread
=
(
pthread_t
*
)
malloc
(
sizeof
(
pthread_t
));
pthread_t
*
pthread
=
(
pthread_t
*
)
malloc
(
sizeof
(
pthread_t
));
pthread_attr_t
thattr
;
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
...
@@ -36,26 +36,24 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
...
@@ -36,26 +36,24 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
return
pthread
;
return
pthread
;
}
}
// destory thread
// destory thread
bool
taosDestoryThread
(
pthread_t
*
pthread
)
{
bool
taosDestoryThread
(
pthread_t
*
pthread
)
{
if
(
pthread
==
NULL
)
return
false
;
if
(
pthread
==
NULL
)
return
false
;
if
(
taosThreadRunning
(
pthread
))
{
if
(
taosThreadRunning
(
pthread
))
{
pthread_cancel
(
*
pthread
);
pthread_cancel
(
*
pthread
);
pthread_join
(
*
pthread
,
NULL
);
pthread_join
(
*
pthread
,
NULL
);
}
}
free
(
pthread
);
free
(
pthread
);
return
true
;
return
true
;
}
}
// thread running return true
// thread running return true
bool
taosThreadRunning
(
pthread_t
*
pthread
)
{
bool
taosThreadRunning
(
pthread_t
*
pthread
)
{
if
(
pthread
==
NULL
)
return
false
;
if
(
pthread
==
NULL
)
return
false
;
int
ret
=
pthread_kill
(
*
pthread
,
0
);
int
ret
=
pthread_kill
(
*
pthread
,
0
);
if
(
ret
==
ESRCH
)
if
(
ret
==
ESRCH
)
return
false
;
return
false
;
if
(
ret
==
EINVAL
)
return
false
;
if
(
ret
==
EINVAL
)
return
false
;
// alive
// alive
return
true
;
return
true
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录