Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e5b41267
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e5b41267
编写于
12月 25, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/vnode
上级
05baf88d
5b827b52
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
153 addition
and
225 deletion
+153
-225
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+0
-10
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+0
-1
source/dnode/mgmt/impl/src/dndDnode.c
source/dnode/mgmt/impl/src/dndDnode.c
+1
-1
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+0
-56
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+8
-1
source/dnode/mnode/impl/inc/mndTopic.h
source/dnode/mnode/impl/inc/mndTopic.h
+1
-1
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+3
-3
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+4
-4
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+42
-14
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+30
-30
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+51
-88
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+3
-3
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-5
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
e5b41267
...
...
@@ -27,7 +27,6 @@ typedef struct SMnodeMsg SMnodeMsg;
typedef
void
(
*
SendMsgToDnodeFp
)(
SDnode
*
pDnode
,
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
typedef
void
(
*
SendMsgToMnodeFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
void
(
*
SendRedirectMsgFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
int32_t
(
*
PutMsgToMnodeQFp
)(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
typedef
struct
SMnodeLoad
{
int64_t
numOfDnode
;
...
...
@@ -63,7 +62,6 @@ typedef struct {
SReplica
replicas
[
TSDB_MAX_REPLICA
];
SMnodeCfg
cfg
;
SDnode
*
pDnode
;
PutMsgToMnodeQFp
putMsgToApplyMsgFp
;
SendMsgToDnodeFp
sendMsgToDnodeFp
;
SendMsgToMnodeFp
sendMsgToMnodeFp
;
SendRedirectMsgFp
sendRedirectMsgFp
;
...
...
@@ -172,14 +170,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg);
*/
void
mndProcessSyncMsg
(
SMnodeMsg
*
pMsg
);
/**
* @brief Process the apply request.
*
* @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure.
*/
void
mndProcessApplyMsg
(
SMnodeMsg
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
e5b41267
...
...
@@ -80,7 +80,6 @@ typedef struct {
SRWLatch
latch
;
taos_queue
pReadQ
;
taos_queue
pWriteQ
;
taos_queue
pApplyQ
;
taos_queue
pSyncQ
;
taos_queue
pMgmtQ
;
SWorkerPool
mgmtPool
;
...
...
source/dnode/mgmt/impl/src/dndDnode.c
浏览文件 @
e5b41267
...
...
@@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
dndGetVnodeLoads
(
pDnode
,
&
pStatus
->
vnodeLoads
);
contLen
=
sizeof
(
SStatusMsg
)
+
pStatus
->
vnodeLoads
.
num
*
sizeof
(
SVnodeLoad
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_STATUS
,
.
ahandle
=
9527
};
SRpcMsg
rpcMsg
=
{.
pCont
=
pStatus
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_STATUS
,
.
ahandle
=
(
void
*
)
9527
};
pMgmt
->
statusSent
=
1
;
dTrace
(
"pDnode:%p, send status msg to mnode"
,
pDnode
);
...
...
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
e5b41267
...
...
@@ -28,18 +28,15 @@ static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
static
void
dndCleanupMnodeMgmtWorker
(
SDnode
*
pDnode
);
static
int32_t
dndAllocMnodeReadQueue
(
SDnode
*
pDnode
);
static
int32_t
dndAllocMnodeWriteQueue
(
SDnode
*
pDnode
);
static
int32_t
dndAllocMnodeApplyQueue
(
SDnode
*
pDnode
);
static
int32_t
dndAllocMnodeSyncQueue
(
SDnode
*
pDnode
);
static
int32_t
dndAllocMnodeMgmtQueue
(
SDnode
*
pDnode
);
static
void
dndFreeMnodeReadQueue
(
SDnode
*
pDnode
);
static
void
dndFreeMnodeWriteQueue
(
SDnode
*
pDnode
);
static
void
dndFreeMnodeApplyQueue
(
SDnode
*
pDnode
);
static
void
dndFreeMnodeSyncQueue
(
SDnode
*
pDnode
);
static
void
dndFreeMnodeMgmtQueue
(
SDnode
*
pDnode
);
static
void
dndProcessMnodeReadQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
void
dndProcessMnodeWriteQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
void
dndProcessMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
void
dndProcessMnodeSyncQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
void
dndProcessMnodeMgmtQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
static
int32_t
dndWriteMnodeMsgToQueue
(
SMnode
*
pMnode
,
taos_queue
pQueue
,
SRpcMsg
*
pRpcMsg
);
...
...
@@ -47,7 +44,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp
void
dndProcessMnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessMnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessMnodeMgmtMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
dndPutMsgIntoMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
int32_t
dndStartMnodeWorker
(
SDnode
*
pDnode
);
static
void
dndStopMnodeWorker
(
SDnode
*
pDnode
);
...
...
@@ -271,11 +267,6 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) {
return
-
1
;
}
if
(
dndAllocMnodeApplyQueue
(
pDnode
)
!=
0
)
{
dError
(
"failed to alloc mnode apply queue since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndAllocMnodeSyncQueue
(
pDnode
)
!=
0
)
{
dError
(
"failed to alloc mnode sync queue since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -293,7 +284,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
while
(
pMgmt
->
refCount
>
1
)
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pMgmt
->
pReadQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pMgmt
->
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pMgmt
->
pWriteQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pMgmt
->
pSyncQ
))
taosMsleep
(
10
);
...
...
@@ -303,7 +293,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
dndFreeMnodeReadQueue
(
pDnode
);
dndFreeMnodeWriteQueue
(
pDnode
);
dndFreeMnodeApplyQueue
(
pDnode
);
dndFreeMnodeSyncQueue
(
pDnode
);
}
...
...
@@ -328,7 +317,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
sendMsgToDnodeFp
=
dndSendMsgToDnode
;
pOption
->
sendMsgToMnodeFp
=
dndSendMsgToMnode
;
pOption
->
sendRedirectMsgFp
=
dndSendRedirectMsg
;
pOption
->
putMsgToApplyMsgFp
=
dndPutMsgIntoMnodeApplyQueue
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
...
...
@@ -604,20 +592,6 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
mndCleanupMsg
(
pMsg
);
}
static
void
dndProcessMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mndProcessApplyMsg
(
pMsg
);
dndReleaseMnode
(
pDnode
,
pMnode
);
}
else
{
mndSendRsp
(
pMsg
,
terrno
);
}
mndCleanupMsg
(
pMsg
);
}
static
void
dndProcessMnodeSyncQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
...
...
@@ -712,19 +686,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndReleaseMnode
(
pDnode
,
pMnode
);
}
static
int32_t
dndPutMsgIntoMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
)
{
return
-
1
;
}
int32_t
code
=
taosWriteQitem
(
pMgmt
->
pApplyQ
,
pMsg
);
dndReleaseMnode
(
pDnode
,
pMnode
);
return
code
;
}
static
int32_t
dndAllocMnodeMgmtQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pMgmt
->
pMgmtQ
=
tWorkerAllocQueue
(
&
pMgmt
->
mgmtPool
,
pDnode
,
(
FProcessItem
)
dndProcessMnodeMgmtQueue
);
...
...
@@ -817,23 +778,6 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
pMgmt
->
pWriteQ
=
NULL
;
}
static
int32_t
dndAllocMnodeApplyQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pMgmt
->
pApplyQ
=
tWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pDnode
,
(
FProcessItem
)
dndProcessMnodeApplyQueue
);
if
(
pMgmt
->
pApplyQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
static
void
dndFreeMnodeApplyQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pMgmt
->
pApplyQ
);
pMgmt
->
pApplyQ
=
NULL
;
}
static
int32_t
dndInitMnodeWriteWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
e5b41267
...
...
@@ -96,6 +96,7 @@ typedef struct {
ETrnStage
stage
;
ETrnPolicy
policy
;
void
*
rpcHandle
;
void
*
rpcAHandle
;
SArray
*
redoLogs
;
SArray
*
undoLogs
;
SArray
*
commitLogs
;
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
e5b41267
...
...
@@ -61,6 +61,13 @@ typedef struct {
char
email
[
TSDB_FQDN_LEN
];
}
STelemMgmt
;
typedef
struct
{
int32_t
errCode
;
sem_t
syncSem
;
SSyncNode
*
pSyncNode
;
ESyncState
state
;
}
SSyncMgmt
;
typedef
struct
SMnode
{
int32_t
dnodeId
;
int32_t
clusterId
;
...
...
@@ -77,11 +84,11 @@ typedef struct SMnode {
SShowMgmt
showMgmt
;
SProfileMgmt
profileMgmt
;
STelemMgmt
telemMgmt
;
SSyncMgmt
syncMgmt
;
MndMsgFp
msgFp
[
TDMT_MAX
];
SendMsgToDnodeFp
sendMsgToDnodeFp
;
SendMsgToMnodeFp
sendMsgToMnodeFp
;
SendRedirectMsgFp
sendRedirectMsgFp
;
PutMsgToMnodeQFp
putMsgToApplyMsgFp
;
}
SMnode
;
void
mndSendMsgToDnode
(
SMnode
*
pMnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
rpcMsg
);
...
...
source/dnode/mnode/impl/inc/mndTopic.h
浏览文件 @
e5b41267
...
...
@@ -26,7 +26,7 @@ int32_t mndInitTopic(SMnode *pMnode);
void
mndCleanupTopic
(
SMnode
*
pMnode
);
STopicObj
*
mndAcquireTopic
(
SMnode
*
pMnode
,
char
*
topicName
);
void
mndReleaseTopic
(
SMnode
*
pMnode
,
STopicObj
*
pTopic
);
void
mndReleaseTopic
(
SMnode
*
pMnode
,
STopicObj
*
pTopic
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
e5b41267
...
...
@@ -35,7 +35,7 @@ typedef struct {
int32_t
mndInitTrans
(
SMnode
*
pMnode
);
void
mndCleanupTrans
(
SMnode
*
pMnode
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
void
*
rpcHandle
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
SRpcMsg
*
pMsg
);
void
mndTransDrop
(
STrans
*
pTrans
);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
e5b41267
...
...
@@ -400,7 +400,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
}
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
db
,
terrstr
());
goto
CREATE_DB_OVER
;
...
...
@@ -608,7 +608,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
static
int32_t
mndUpdateDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"db:%s, failed to update since %s"
,
pOldDb
->
name
,
terrstr
());
return
terrno
;
...
...
@@ -772,7 +772,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"db:%s, failed to drop since %s"
,
pDb
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
e5b41267
...
...
@@ -396,7 +396,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
return
terrno
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
pCreate
->
ep
,
terrstr
());
return
-
1
;
...
...
@@ -452,7 +452,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mndDropDnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%d, failed to drop since %s"
,
pDnode
->
id
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
e5b41267
...
...
@@ -147,7 +147,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
pFunc
->
pCode
=
pFunc
->
pData
+
pCreate
->
commentSize
;
memcpy
(
pFunc
->
pCode
,
pCreate
->
pCont
+
pCreate
->
commentSize
,
pFunc
->
codeSize
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
free
(
pFunc
);
mError
(
"func:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
...
...
@@ -195,7 +195,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
}
static
int32_t
mndDropFunc
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SFuncObj
*
pFunc
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"func:%s, failed to drop since %s"
,
pFunc
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -441,7 +441,7 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le
}
if
(
type
==
TSDB_DATA_TYPE_NCHAR
||
type
==
TSDB_DATA_TYPE_BINARY
)
{
int32_t
bytes
=
len
>
0
?
(
int
)(
len
-
VARSTR_HEADER_SIZE
)
:
len
;
int32_t
bytes
=
len
>
0
?
(
int
32_t
)(
len
-
VARSTR_HEADER_SIZE
)
:
len
;
snprintf
(
buf
,
buflen
-
1
,
"%s(%d)"
,
tDataTypes
[
type
].
name
,
type
==
TSDB_DATA_TYPE_NCHAR
?
bytes
/
4
:
bytes
);
buf
[
buflen
-
1
]
=
0
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
e5b41267
...
...
@@ -334,7 +334,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
goto
CREATE_MNODE_OVER
;
...
...
@@ -500,7 +500,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static
int32_t
mndDropMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SMnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to drop since %s"
,
pObj
->
id
,
terrstr
());
goto
DROP_MNODE_OVER
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
e5b41267
...
...
@@ -454,7 +454,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
memcpy
(
stbObj
.
pSchema
,
pCreate
->
pSchema
,
totalSize
);
int32_t
code
=
0
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -653,7 +653,7 @@ static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj
static
int32_t
mndDropStb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SStbObj
*
pStb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"stb:%s, failed to drop since %s"
,
pStb
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -873,8 +873,8 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM
}
static
void
mndExtractTableName
(
char
*
tableId
,
char
*
name
)
{
int
pos
=
-
1
;
int
num
=
0
;
int
32_t
pos
=
-
1
;
int
32_t
num
=
0
;
for
(
pos
=
0
;
tableId
[
pos
]
!=
0
;
++
pos
)
{
if
(
tableId
[
pos
]
==
'.'
)
num
++
;
if
(
num
==
2
)
break
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
e5b41267
...
...
@@ -14,26 +14,54 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndTrans.h"
#include "mndSync.h"
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{}
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
pMgmt
->
state
=
TAOS_SYNC_STATE_LEADER
;
pMgmt
->
pSyncNode
=
NULL
;
return
0
;
}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
tsem_destroy
(
&
pMgmt
->
syncSem
);
}
static
int32_t
mndSyncApplyCb
(
struct
SSyncFSM
*
fsm
,
SyncIndex
index
,
const
SSyncBuffer
*
buf
,
void
*
pData
)
{
SMnode
*
pMnode
=
pData
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
errCode
=
0
;
tsem_post
(
&
pMgmt
->
syncSem
);
return
0
;
}
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
0
;
#if 1
return
0
;
#else
if
(
pMnode
->
replica
==
1
)
return
0
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
errCode
=
0
;
SSyncBuffer
buf
=
{.
data
=
pRaw
,
.
len
=
sdbGetRawTotalSize
(
pRaw
)};
bool
isWeak
=
false
;
int32_t
code
=
syncPropose
(
pMgmt
->
pSyncNode
,
&
buf
,
pMnode
,
isWeak
);
// int32_t len = sdbGetRawTotalSize(pRaw);
// SSdbRaw *pReceived = calloc(1, len);
// memcpy(pReceived, pRaw, len);
// mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
if
(
code
!=
0
)
return
code
;
// mndTransApply(pMnode, pReceived, code);
return
code
;
tsem_wait
(
&
pMgmt
->
syncSem
);
return
pMgmt
->
errCode
;
#endif
}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
// pMnode->role = TAOS_SYNC_STATE_LEADER
;
return
true
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
return
pMgmt
->
state
==
TAOS_SYNC_STATE_LEADER
;
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
e5b41267
...
...
@@ -14,18 +14,18 @@
*/
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define
TSDB
_TOPIC_VER_NUMBER 1
#define
TSDB
_TOPIC_RESERVE_SIZE 64
#define
MND
_TOPIC_VER_NUMBER 1
#define
MND
_TOPIC_RESERVE_SIZE 64
static
SSdbRaw
*
mndTopicActionEncode
(
STopicObj
*
pTopic
);
static
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -73,8 +73,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
void
mndCleanupTopic
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTopicActionEncode
(
STopicObj
*
pTopic
)
{
int32_t
size
=
sizeof
(
STopicObj
)
+
TSDB
_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
TSDB
_TOPIC_VER_NUMBER
,
size
);
int32_t
size
=
sizeof
(
STopicObj
)
+
MND
_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND
_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
@@ -90,7 +90,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB
_TOPIC_RESERVE_SIZE
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND
_TOPIC_RESERVE_SIZE
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
return
pRaw
;
...
...
@@ -100,14 +100,14 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_TOPIC_VER_NUMBER
)
{
mError
(
"failed to decode stable since %s"
,
terrstr
());
if
(
sver
!=
MND_TOPIC_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode topic since %s"
,
terrstr
());
return
NULL
;
}
int32_t
size
=
sizeof
(
STopicObj
)
+
TSDB_MAX_COLUMNS
*
sizeof
(
SSchema
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
int32_t
size
=
sizeof
(
STopicObj
)
+
TSDB_MAX_COLUMNS
*
sizeof
(
SSchema
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
STopicObj
*
pTopic
=
sdbGetRowObj
(
pRow
);
if
(
pTopic
==
NULL
)
return
NULL
;
...
...
@@ -124,7 +124,7 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
sqlLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB
_TOPIC_RESERVE_SIZE
);
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
MND
_TOPIC_RESERVE_SIZE
);
return
pRow
;
}
...
...
@@ -168,7 +168,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj
}
STopicObj
*
mndAcquireTopic
(
SMnode
*
pMnode
,
char
*
topicName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
STopicObj
*
pTopic
=
sdbAcquire
(
pSdb
,
SDB_TOPIC
,
topicName
);
if
(
pTopic
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
...
...
@@ -208,7 +208,7 @@ static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *p
pCreate
->
sverson
=
htonl
(
pTopic
->
version
);
pCreate
->
sql
=
malloc
(
pTopic
->
sqlLen
);
if
(
pCreate
->
sql
==
NULL
)
{
if
(
pCreate
->
sql
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -216,7 +216,7 @@ static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *p
memcpy
(
pCreate
->
sql
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
pCreate
->
executor
=
malloc
(
pTopic
->
execLen
);
if
(
pCreate
->
executor
==
NULL
)
{
if
(
pCreate
->
executor
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -244,7 +244,7 @@ static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgro
}
static
int32_t
mndCheckCreateTopicMsg
(
SCreateTopicMsg
*
pCreate
)
{
//deserialize and other stuff
//
deserialize and other stuff
return
0
;
}
...
...
@@ -367,7 +367,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *
#endif
int32_t
code
=
0
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -413,7 +413,7 @@ CREATE_TOPIC_OVER:
}
static
int32_t
mndProcessCreateTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SCreateTopicMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to create"
,
pCreate
->
name
);
...
...
@@ -436,7 +436,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
}
}
//topic should have different name with stb
//
topic should have different name with stb
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
pCreate
->
name
);
if
(
pStb
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStb
);
...
...
@@ -492,7 +492,7 @@ static int32_t mndCheckAlterTopicMsg(SAlterTopicMsg *pAlter) {
static
int32_t
mndUpdateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
STopicObj
*
pOldTopic
,
STopicObj
*
pNewTopic
)
{
return
0
;
}
static
int32_t
mndProcessAlterTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SAlterTopicMsg
*
pAlter
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to alter"
,
pAlter
->
name
);
...
...
@@ -561,7 +561,7 @@ static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopic
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
STopicObj
*
pTopic
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -606,7 +606,7 @@ DROP_TOPIC_OVER:
}
static
int32_t
mndProcessDropTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SDropTopicMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to drop"
,
pDrop
->
name
);
...
...
@@ -705,7 +705,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
return
0
;
}
static
int32_t
mndProcessCreateTopicInRsp
(
SMnodeMsg
*
pMsg
)
{
static
int32_t
mndProcessCreateTopicInRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
...
...
@@ -788,8 +788,8 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
}
static
void
mndExtractTableName
(
char
*
tableId
,
char
*
name
)
{
int
pos
=
-
1
;
int
num
=
0
;
int
32_t
pos
=
-
1
;
int
32_t
num
=
0
;
for
(
pos
=
0
;
tableId
[
pos
]
!=
0
;
++
pos
)
{
if
(
tableId
[
pos
]
==
'.'
)
num
++
;
if
(
num
==
2
)
break
;
...
...
@@ -801,13 +801,13 @@ static void mndExtractTableName(char *tableId, char *name) {
}
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
STopicObj
*
pTopic
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
64
]
=
{
0
};
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
64
]
=
{
0
};
tstrncpy
(
prefix
,
pShow
->
db
,
64
);
strcat
(
prefix
,
TS_PATH_DELIMITER
);
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
e5b41267
...
...
@@ -17,9 +17,9 @@
#include "mndTrans.h"
#include "mndSync.h"
#define
TSDB
_TRANS_VER_NUMBER 1
#define
TSDB_TRN
_ARRAY_SIZE 8
#define
TSDB_TRN
_RESERVE_SIZE 64
#define
MND
_TRANS_VER_NUMBER 1
#define
MND_TRANS
_ARRAY_SIZE 8
#define
MND_TRANS
_RESERVE_SIZE 64
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOldTrans
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
);
...
...
@@ -61,7 +60,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
sizeof
(
STrans
)
+
TSDB_TRN
_RESERVE_SIZE
;
int32_t
rawDataLen
=
sizeof
(
STrans
)
+
MND_TRANS
_RESERVE_SIZE
;
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
...
...
@@ -93,7 +92,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
TSDB
_TRANS_VER_NUMBER
,
rawDataLen
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
MND
_TRANS_VER_NUMBER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
return
NULL
;
...
...
@@ -145,7 +144,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pCont
,
pAction
->
contLen
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN
_RESERVE_SIZE
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TRANS
_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
return
pRaw
;
...
...
@@ -155,12 +154,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t
code
=
0
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
{
mError
(
"failed to get soft ver from raw:%p since %s"
,
pRaw
,
terrstr
());
return
NULL
;
}
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB
_TRANS_VER_NUMBER
)
{
if
(
sver
!=
MND
_TRANS_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to get check soft ver from raw:%p since %s"
,
pRaw
,
terrstr
());
return
NULL
;
...
...
@@ -173,11 +169,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return
NULL
;
}
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN
_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN
_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
redoLogs
=
taosArrayInit
(
MND_TRANS
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
MND_TRANS
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
MND_TRANS
_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
MND_TRANS
_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
MND_TRANS
_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -278,7 +274,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_TRN
_RESERVE_SIZE
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
MND_TRANS
_RESERVE_SIZE
)
TRANS_DECODE_OVER:
if
(
code
!=
0
)
{
...
...
@@ -358,7 +354,7 @@ char *mndTransPolicyStr(ETrnPolicy policy) {
}
}
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
void
*
rpcHandle
)
{
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
SRpcMsg
*
pMsg
)
{
STrans
*
pTrans
=
calloc
(
1
,
sizeof
(
STrans
));
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -369,12 +365,13 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans
->
id
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_TRANS
);
pTrans
->
stage
=
TRN_STAGE_PREPARE
;
pTrans
->
policy
=
policy
;
pTrans
->
rpcHandle
=
rpcHandle
;
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
rpcHandle
=
pMsg
->
handle
;
pTrans
->
rpcAHandle
=
pMsg
->
ahandle
;
pTrans
->
redoLogs
=
taosArrayInit
(
MND_TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
MND_TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
MND_TRANS_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
MND_TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
MND_TRANS_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -416,11 +413,6 @@ void mndTransDrop(STrans *pTrans) {
tfree
(
pTrans
);
}
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
)
{
pTrans
->
rpcHandle
=
rpcHandle
;
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
}
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
if
(
pArray
==
NULL
||
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -504,11 +496,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
STrans
*
pNewTrans
=
mndAcquireTrans
(
pMnode
,
pTrans
->
id
);
if
(
pNewTrans
==
NULL
)
{
mError
(
"trans:%d, failed to read
y
from sdb since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to read from sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
pNewTrans
->
rpcHandle
=
pTrans
->
rpcHandle
;
pNewTrans
->
rpcAHandle
=
pTrans
->
rpcAHandle
;
mndTransExecute
(
pMnode
,
pNewTrans
);
mndReleaseTrans
(
pMnode
,
pNewTrans
);
return
0
;
...
...
@@ -526,19 +519,17 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
mTrace
(
"trans:%d, sync to other nodes"
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
);
if
(
code
!=
0
)
{
if
(
mndSyncPropose
(
pMnode
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
mTrace
(
"trans:%d, sync finished"
,
pTrans
->
id
);
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
}
if
(
sdbWrite
(
pMnode
->
pSdb
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
())
;
return
-
1
;
}
mDebug
(
"trans:%d, commit finished"
,
pTrans
->
id
);
...
...
@@ -576,10 +567,11 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
)
{
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
return
;
mDebug
(
"trans:%d, send rpc rsp, RPC:%p code:0x%x"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
code
&
0xFFFF
);
mDebug
(
"trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x"
,
pTrans
->
id
,
pTrans
->
rpcHandle
,
pTrans
->
rpcAHandle
,
code
&
0xFFFF
);
if
(
pTrans
->
rpcHandle
!=
NULL
)
{
SRpcMsg
rspMsg
=
{.
handle
=
pTrans
->
rpcHandle
,
.
code
=
code
};
SRpcMsg
rspMsg
=
{.
handle
=
pTrans
->
rpcHandle
,
.
code
=
code
,
.
ahandle
=
pTrans
->
rpcAHandle
};
rpcSendResponse
(
&
rspMsg
);
}
}
...
...
@@ -614,7 +606,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
}
int32_t
actionNum
=
taosArrayGetSize
(
pTrans
->
redoActions
);
if
(
action
<
0
||
action
>
actionNum
)
{
if
(
action
<
0
||
action
>
=
actionNum
)
{
mError
(
"trans:%d, invalid action:%d"
,
transId
,
action
);
goto
HANDLE_ACTION_RSP_OVER
;
}
...
...
@@ -636,6 +628,8 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
if
(
arraySize
==
0
)
return
0
;
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWriteNotFree
(
pSdb
,
pRaw
);
...
...
@@ -648,45 +642,15 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
}
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
redoLogs
)
!=
0
)
{
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mDebug
(
"trans:%d, execute redo logs finished"
,
pTrans
->
id
)
}
}
return
code
;
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
redoLogs
);
}
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
undoLogs
)
!=
0
)
{
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
undoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mDebug
(
"trans:%d, execute undo logs finished"
,
pTrans
->
id
)
}
}
return
code
;
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
undoLogs
);
}
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
mDebug
(
"trans:%d, execute commit logs finished"
,
pTrans
->
id
)
}
}
return
code
;
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
commitLogs
);
}
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SArray
*
pArray
)
{
...
...
@@ -719,25 +683,25 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
mndSendMsgToDnode
(
pMnode
,
&
pAction
->
epSet
,
&
rpcMsg
);
}
int32_t
numOfReceived
Msgs
=
0
;
int32_t
err
or
Code
=
0
;
int32_t
numOfReceived
=
0
;
int32_t
errCode
=
0
;
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
)
{
numOfReceived
Msgs
++
;
numOfReceived
++
;
if
(
pAction
->
errCode
!=
0
)
{
err
or
Code
=
pAction
->
errCode
;
errCode
=
pAction
->
errCode
;
}
}
}
if
(
numOfReceived
Msgs
==
numOfActions
)
{
mDebug
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
err
or
Code
);
terrno
=
err
or
Code
;
return
err
or
Code
;
if
(
numOfReceived
==
numOfActions
)
{
mDebug
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
errCode
);
terrno
=
errCode
;
return
errCode
;
}
else
{
mDebug
(
"trans:%d, %d of %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfReceived
Msgs
,
numOfActions
,
erro
rCode
);
mDebug
(
"trans:%d, %d of %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfReceived
,
numOfActions
,
er
rCode
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
}
...
...
@@ -772,7 +736,6 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
mDebug
(
"trans:%d, stage from execute to commit"
,
pTrans
->
id
);
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mDebug
(
"trans:%d, stage keep on execute since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
return
code
;
}
else
{
if
(
pTrans
->
policy
==
TRN_POLICY_ROLLBACK
)
{
pTrans
->
stage
=
TRN_STAGE_ROLLBACK
;
...
...
@@ -783,7 +746,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
}
}
return
0
;
return
code
;
}
static
int32_t
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
...
@@ -823,9 +786,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
}
break
;
case
TRN_STAGE_ROLLBACK
:
code
=
mndTrans
PerformRollbackStage
(
pMnode
,
pTrans
);
code
=
mndTrans
Rollback
(
pMnode
,
pTrans
);
if
(
code
==
0
)
{
code
=
mndTransRollback
(
pMnode
,
pTrans
);
mndTransPerformRollbackStage
(
pMnode
,
pTrans
);
}
break
;
default:
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
e5b41267
...
...
@@ -197,7 +197,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
superUser
=
0
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
user
,
terrstr
());
return
-
1
;
...
...
@@ -267,7 +267,7 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mndUpdateUser
(
SMnode
*
pMnode
,
SUserObj
*
pOldUser
,
SUserObj
*
pNewUser
,
SMnodeMsg
*
pMsg
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to update since %s"
,
pOldUser
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -342,7 +342,7 @@ static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mndDropUser
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SUserObj
*
pUser
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to drop since %s"
,
pUser
->
user
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
e5b41267
...
...
@@ -202,7 +202,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode
->
selfIndex
=
pOption
->
selfIndex
;
memcpy
(
&
pMnode
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
pMnode
->
pDnode
=
pOption
->
pDnode
;
pMnode
->
putMsgToApplyMsgFp
=
pOption
->
putMsgToApplyMsgFp
;
pMnode
->
sendMsgToDnodeFp
=
pOption
->
sendMsgToDnodeFp
;
pMnode
->
sendMsgToMnodeFp
=
pOption
->
sendMsgToMnodeFp
;
pMnode
->
sendRedirectMsgFp
=
pOption
->
sendRedirectMsgFp
;
...
...
@@ -217,8 +216,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode
->
cfg
.
buildinfo
=
strdup
(
pOption
->
cfg
.
buildinfo
);
if
(
pMnode
->
sendMsgToDnodeFp
==
NULL
||
pMnode
->
sendMsgToMnodeFp
==
NULL
||
pMnode
->
sendRedirectMsgFp
==
NULL
||
pMnode
->
putMsgToApplyMsgFp
==
NULL
||
pMnode
->
dnodeId
<
0
||
pMnode
->
clusterId
<
0
||
pMnode
->
cfg
.
statusInterval
<
1
)
{
pMnode
->
dnodeId
<
0
||
pMnode
->
clusterId
<
0
||
pMnode
->
cfg
.
statusInterval
<
1
)
{
terrno
=
TSDB_CODE_MND_INVALID_OPTIONS
;
return
-
1
;
}
...
...
@@ -438,8 +436,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void
mndProcessSyncMsg
(
SMnodeMsg
*
pMsg
)
{
mndProcessRpcMsg
(
pMsg
);
}
void
mndProcessApplyMsg
(
SMnodeMsg
*
pMsg
)
{}
uint64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
)
{
int64_t
us
=
taosGetTimestampUs
();
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录