Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f78a3180
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f78a3180
编写于
4月 28, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(tmq): add rebalance global lock
上级
d998e3e8
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
31 addition
and
10 deletion
+31
-10
include/common/tmsg.h
include/common/tmsg.h
+0
-1
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+5
-0
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+2
-2
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+14
-5
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+5
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+5
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
f78a3180
...
...
@@ -1474,7 +1474,6 @@ _err:
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
typedef
struct
{
int8_t
*
mqInReb
;
SHashObj
*
rebSubHash
;
// SHashObj<key, SMqRebSubscribe>
}
SMqDoRebalanceMsg
;
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
f78a3180
...
...
@@ -44,6 +44,11 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t
mndSetConsumerCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
);
bool
mndRebTryStart
();
void
mndRebEnd
();
void
mndRebCntInc
();
void
mndRebCntDec
();
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
f78a3180
...
...
@@ -36,8 +36,8 @@ typedef struct {
typedef
enum
{
TEST_TRANS_START_FUNC
=
1
,
TEST_TRANS_STOP_FUNC
=
2
,
CONSUME
_TRANS_START_FUNC
=
3
,
CONSUME
_TRANS_STOP_FUNC
=
4
,
MQ_REB
_TRANS_START_FUNC
=
3
,
MQ_REB
_TRANS_STOP_FUNC
=
4
,
}
ETrnFuncType
;
typedef
void
(
*
TransCbFp
)(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
f78a3180
...
...
@@ -35,7 +35,7 @@
#define MND_CONSUMER_LOST_HB_CNT 3
static
int8_t
mq
InRebFlag
=
0
;
static
int8_t
mq
RebLock
=
0
;
static
const
char
*
mndConsumerStatusName
(
int
status
);
...
...
@@ -75,6 +75,17 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
bool
mndRebTryStart
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
mqRebLock
,
0
,
1
);
return
old
==
0
;
}
void
mndRebEnd
()
{
atomic_sub_fetch_8
(
&
mqRebLock
,
1
);
}
void
mndRebCntInc
()
{
atomic_add_fetch_8
(
&
mqRebLock
,
1
);
}
void
mndRebCntDec
()
{
atomic_sub_fetch_8
(
&
mqRebLock
,
1
);
}
static
int32_t
mndProcessConsumerLostMsg
(
SNodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pNode
;
SMqConsumerLostMsg
*
pLostMsg
=
pMsg
->
rpcMsg
.
pCont
;
...
...
@@ -143,8 +154,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
void
*
pIter
=
NULL
;
// rebalance cannot be parallel
int8_t
old
=
atomic_val_compare_exchange_8
(
&
mqInRebFlag
,
0
,
1
);
if
(
old
!=
0
)
{
if
(
!
mndRebTryStart
())
{
mInfo
(
"mq rebalance already in progress, do nothing"
);
return
0
;
}
...
...
@@ -152,7 +162,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMqDoRebalanceMsg
*
pRebMsg
=
rpcMallocCont
(
sizeof
(
SMqDoRebalanceMsg
));
pRebMsg
->
rebSubHash
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_NO_LOCK
);
// TODO set cleanfp
pRebMsg
->
mqInReb
=
&
mqInRebFlag
;
// iterate all consumers, find all modification
while
(
1
)
{
...
...
@@ -223,7 +232,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
taosHashCleanup
(
pRebMsg
->
rebSubHash
);
rpcFreeCont
(
pRebMsg
);
mTrace
(
"mq rebalance finished, no modification"
);
atomic_store_8
(
&
mqInRebFlag
,
0
);
mndRebEnd
(
);
}
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
f78a3180
...
...
@@ -452,7 +452,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
}
// 4. TODO commit log: modification log
// 5. execution
// 5. set cb
mndTransSetCb
(
pTrans
,
MQ_REB_TRANS_START_FUNC
,
MQ_REB_TRANS_STOP_FUNC
,
NULL
,
0
);
// 6. execution
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
REB_FAIL
;
mndTransDrop
(
pTrans
);
...
...
@@ -518,9 +521,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
}
// reset flag
atomic_store_8
(
pReq
->
mqInReb
,
0
);
mInfo
(
"mq rebalance completed successfully"
);
taosHashCleanup
(
pReq
->
rebSubHash
);
mndRebEnd
();
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
f78a3180
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndTrans.h"
#include "mndAuth.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndShow.h"
#include "mndSync.h"
...
...
@@ -442,6 +443,10 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
return
mndTransTestStartFunc
;
case
TEST_TRANS_STOP_FUNC
:
return
mndTransTestStopFunc
;
case
MQ_REB_TRANS_START_FUNC
:
return
mndRebCntInc
;
case
MQ_REB_TRANS_STOP_FUNC
:
return
mndRebCntDec
;
default:
return
NULL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录