Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
64b540be
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
64b540be
编写于
7月 05, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: adjust vnode propose msg
上级
f00709f3
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
43 addition
and
59 deletion
+43
-59
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+1
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+36
-53
未找到文件。
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
64b540be
...
...
@@ -107,7 +107,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-sync queue"
,
pVnode
->
vgId
,
pMsg
);
int32_t
code
=
vnodeProcessSync
Req
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
int32_t
code
=
vnodeProcessSync
Msg
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
64b540be
...
...
@@ -52,10 +52,10 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodeClose
(
SVnode
*
pVnode
);
int32_t
vnodePre
p
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodePre
P
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSync
Req
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSync
Msg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
64b540be
...
...
@@ -240,7 +240,7 @@ struct SVnode {
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
int32_t
sync
Count
;
int32_t
block
Count
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
};
...
...
source/dnode/vnode/src/tq/tqOffset.c
浏览文件 @
64b540be
...
...
@@ -85,6 +85,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
void
tqOffsetClose
(
STqOffsetStore
*
pStore
)
{
tqOffsetSnapshot
(
pStore
);
taosHashCleanup
(
pStore
->
pHash
);
taosMemoryFree
(
pStore
);
}
STqOffset
*
tqOffsetRead
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
64b540be
...
...
@@ -81,7 +81,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
state
.
applied
=
info
.
state
.
committed
;
pVnode
->
pTfs
=
pTfs
;
pVnode
->
msgCb
=
msgCb
;
pVnode
->
sync
Count
=
0
;
pVnode
->
block
Count
=
0
;
tsem_init
(
&
pVnode
->
syncSem
,
0
,
0
);
tsem_init
(
&
(
pVnode
->
canCommit
),
0
,
1
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
64b540be
...
...
@@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
static
int32_t
vnodeProcessWriteMsg
(
SVnode
*
pVnode
,
int64_t
version
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
int32_t
vnodePre
p
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
vnodePre
P
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
SDecoder
dc
=
{
0
};
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
64b540be
...
...
@@ -25,12 +25,12 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static
inline
void
vnodeAccumBlockMsg
(
SVnode
*
pVnode
,
tmsg_t
type
)
{
if
(
!
vnodeIsMsgBlock
(
type
))
return
;
int32_t
count
=
atomic_add_fetch_32
(
&
pVnode
->
sync
Count
,
1
);
int32_t
count
=
atomic_add_fetch_32
(
&
pVnode
->
block
Count
,
1
);
vTrace
(
"vgId:%d, accum block, count:%d type:%s"
,
pVnode
->
config
.
vgId
,
count
,
TMSG_INFO
(
type
));
}
static
inline
void
vnodeWaitBlockMsg
(
SVnode
*
pVnode
)
{
int32_t
count
=
atomic_load_32
(
&
pVnode
->
sync
Count
);
int32_t
count
=
atomic_load_32
(
&
pVnode
->
block
Count
);
if
(
count
<=
0
)
return
;
vTrace
(
"vgId:%d, wait block finish, count:%d"
,
pVnode
->
config
.
vgId
,
count
);
...
...
@@ -40,10 +40,10 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
static
inline
void
vnodePostBlockMsg
(
SVnode
*
pVnode
,
tmsg_t
type
)
{
if
(
!
vnodeIsMsgBlock
(
type
))
return
;
int32_t
count
=
atomic_load_32
(
&
pVnode
->
sync
Count
);
int32_t
count
=
atomic_load_32
(
&
pVnode
->
block
Count
);
if
(
count
<=
0
)
return
;
count
=
atomic_sub_fetch_32
(
&
pVnode
->
sync
Count
,
1
);
count
=
atomic_sub_fetch_32
(
&
pVnode
->
block
Count
,
1
);
vTrace
(
"vgId:%d, post block, count:%d type:%s"
,
pVnode
->
config
.
vgId
,
count
,
TMSG_INFO
(
type
));
if
(
count
<=
0
)
{
tsem_post
(
&
pVnode
->
syncSem
);
...
...
@@ -84,8 +84,10 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
terrno
=
TSDB_CODE_INVALID_MSG
;
return
TSDB_CODE_INVALID_MSG
;
}
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, start to alter vnode replica to %d, handle:%p"
,
TD_VID
(
pVnode
),
req
.
replica
,
pMsg
->
info
.
handle
);
SSyncCfg
cfg
=
{.
replicaNum
=
req
.
replica
,
.
myIndex
=
req
.
selfIndex
};
for
(
int32_t
r
=
0
;
r
<
req
.
replica
;
++
r
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
r
];
...
...
@@ -126,68 +128,49 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for
(
int32_t
m
=
0
;
m
<
numOfMsgs
;
m
++
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, msg:%p get from vnode-write queue handle:%p"
,
vgId
,
pMsg
,
pMsg
->
info
.
handle
);
if
(
pMsg
->
msgType
==
TDMT_VND_ALTER_REPLICA
)
{
code
=
vnodeProcessAlterReplicaReq
(
pVnode
,
pMsg
);
code
=
vnodePreProcessReq
(
pVnode
,
pMsg
);
if
(
code
!=
0
)
{
vError
(
"vgId:%d, msg:%p failed to pre-process since %s"
,
vgId
,
pMsg
,
terrstr
());
}
else
{
code
=
vnodePreprocessReq
(
pVnode
,
pMsg
);
if
(
code
!=
0
)
{
vError
(
"vgId:%d, failed to pre-process msg:%p since %s"
,
vgId
,
pMsg
,
terrstr
());
if
(
pMsg
->
msgType
==
TDMT_VND_ALTER_REPLICA
)
{
code
=
vnodeProcessAlterReplicaReq
(
pVnode
,
pMsg
);
}
else
{
code
=
syncPropose
(
pVnode
->
sync
,
pMsg
,
vnodeIsMsgWeak
(
pMsg
->
msgType
));
if
(
code
==
1
)
{
do
{
static
int32_t
cnt
=
0
;
if
(
cnt
++
%
1000
==
1
)
{
vInfo
(
"vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d"
,
vgId
,
pMsg
,
pMsg
->
info
.
conn
.
applyIndex
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
}
}
while
(
0
);
if
(
code
>
0
)
{
SRpcMsg
rsp
=
{.
code
=
pMsg
->
code
,
.
info
=
pMsg
->
info
};
if
(
vnodeProcessWriteReq
(
pVnode
,
pMsg
,
pMsg
->
info
.
conn
.
applyIndex
,
&
rsp
)
<
0
)
{
rsp
.
code
=
terrno
;
vInfo
(
"vgId:%d, msg:%p failed to apply right now since %s"
,
vgId
,
pMsg
,
terrstr
());
}
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
vError
(
"vgId:%d, msg:%p failed to apply right now since %s"
,
vgId
,
pMsg
,
terrstr
());
}
tmsgSendRsp
(
&
rsp
);
}
}
}
if
(
code
==
0
)
{
vnodeAccumBlockMsg
(
pVnode
,
pMsg
->
msgType
);
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
SEpSet
newEpSet
=
{
0
};
syncGetRetryEpSet
(
pVnode
->
sync
,
&
newEpSet
);
/*
syncGetEpSet(pVnode->sync, &newEpSet);
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
}
*/
vGTrace
(
"vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d"
,
vgId
,
pMsg
,
newEpSet
.
numOfEps
,
newEpSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
newEpSet
.
numOfEps
;
++
i
)
{
vGTrace
(
"vgId:%d, msg:%p redirect:%d ep:%s:%u"
,
vgId
,
pMsg
,
i
,
newEpSet
.
eps
[
i
].
fqdn
,
newEpSet
.
eps
[
i
].
port
);
}
pMsg
->
info
.
hasEpSet
=
1
;
SRpcMsg
rsp
=
{.
code
=
TSDB_CODE_RPC_REDIRECT
,
.
info
=
pMsg
->
info
};
tmsgSendRedirectRsp
(
&
rsp
,
&
newEpSet
);
}
else
{
if
(
code
!=
1
)
{
}
else
if
(
code
<
0
)
{
if
(
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
SEpSet
newEpSet
=
{
0
};
syncGetRetryEpSet
(
pVnode
->
sync
,
&
newEpSet
);
vGTrace
(
"vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d"
,
vgId
,
pMsg
,
newEpSet
.
numOfEps
,
newEpSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
newEpSet
.
numOfEps
;
++
i
)
{
vGTrace
(
"vgId:%d, msg:%p redirect:%d ep:%s:%u"
,
vgId
,
pMsg
,
i
,
newEpSet
.
eps
[
i
].
fqdn
,
newEpSet
.
eps
[
i
].
port
);
}
pMsg
->
info
.
hasEpSet
=
1
;
SRpcMsg
rsp
=
{.
code
=
TSDB_CODE_RPC_REDIRECT
,
.
info
=
pMsg
->
info
};
tmsgSendRedirectRsp
(
&
rsp
,
&
newEpSet
);
}
else
{
if
(
terrno
!=
0
)
code
=
terrno
;
vError
(
"vgId:%d, msg:%p failed to propose since %s, code:0x%x"
,
vgId
,
pMsg
,
tstrerror
(
code
),
code
);
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
}
else
{
}
vGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
vgId
,
pMsg
,
code
);
...
...
@@ -206,7 +189,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p"
,
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
);
...
...
@@ -229,7 +212,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
int32_t
vnodeProcessSync
Req
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
vnodeProcessSync
Msg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
...
...
@@ -247,7 +230,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
if
(
gRaftDetailLog
)
{
char
logBuf
[
512
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==vnodeProcessSync
Req
== msgType:%d, syncNode: %s"
,
pMsg
->
msgType
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==vnodeProcessSync
Msg
== msgType:%d, syncNode: %s"
,
pMsg
->
msgType
,
syncNodeStr
);
syncRpcMsgLog2
(
logBuf
,
pMsg
);
}
...
...
@@ -313,7 +296,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg
rsp
=
{.
code
=
ret
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
vError
(
"==vnodeProcessSync
Req
== error msg type:%d"
,
pRpcMsg
->
msgType
);
vError
(
"==vnodeProcessSync
Msg
== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
-
1
;
}
...
...
@@ -380,14 +363,14 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg
rsp
=
{.
code
=
ret
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
vError
(
"==vnodeProcessSync
Req
== error msg type:%d"
,
pRpcMsg
->
msgType
);
vError
(
"==vnodeProcessSync
Msg
== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
-
1
;
}
}
syncNodeRelease
(
pSyncNode
);
}
else
{
vError
(
"==vnodeProcessSync
Req
== error syncEnv stop"
);
vError
(
"==vnodeProcessSync
Msg
== error syncEnv stop"
);
ret
=
-
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录