Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
432f8bfd
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
432f8bfd
编写于
2月 10, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
rewrite rebalance
上级
8b7ca942
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
578 addition
and
914 deletion
+578
-914
include/common/tmsg.h
include/common/tmsg.h
+12
-8
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-1
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+3
-0
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+2
-2
source/client/test/tmqTest.cpp
source/client/test/tmqTest.cpp
+0
-1
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+7
-0
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+2
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+103
-312
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+13
-145
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+420
-436
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+2
-1
source/dnode/vnode/src/inc/tsdbReadImpl.h
source/dnode/vnode/src/inc/tsdbReadImpl.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+4
-4
source/dnode/vnode/src/vnd/vnodeBufferPool.c
source/dnode/vnode/src/vnd/vnodeBufferPool.c
+2
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
432f8bfd
...
...
@@ -1104,10 +1104,14 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
return
buf
;
}
typedef
struct
SMqTmrMsg
{
typedef
struct
{
int32_t
reserved
;
}
SMqTmrMsg
;
typedef
struct
{
int64_t
consumerId
;
}
SMqDoRebalanceMsg
;
typedef
struct
{
int64_t
status
;
}
SMVSubscribeRsp
;
...
...
@@ -1685,13 +1689,13 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return
buf
;
}
typedef
struct
SMqTbData
{
typedef
struct
{
int64_t
uid
;
int32_t
numOfRows
;
char
*
colData
;
}
SMqTbData
;
typedef
struct
SMqTopicBlk
{
typedef
struct
{
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
int64_t
committedOffset
;
int64_t
reqOffset
;
...
...
@@ -1702,7 +1706,7 @@ typedef struct SMqTopicBlk {
SMqTbData
*
tbData
;
}
SMqTopicData
;
typedef
struct
SMqConsumeRsp
{
typedef
struct
{
int64_t
consumerId
;
SSchemaWrapper
*
schemas
;
int64_t
committedOffset
;
...
...
@@ -1714,7 +1718,7 @@ typedef struct SMqConsumeRsp {
}
SMqConsumeRsp
;
// one req for one vg+topic
typedef
struct
SMqConsumeReq
{
typedef
struct
{
SMsgHead
head
;
//0: commit only, current offset
//1: consume only, poll next offset
...
...
@@ -1730,17 +1734,17 @@ typedef struct SMqConsumeReq {
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
}
SMqConsumeReq
;
typedef
struct
SMqSubVgEp
{
typedef
struct
{
int32_t
vgId
;
SEpSet
epSet
;
}
SMqSubVgEp
;
typedef
struct
SMqSubTopicEp
{
typedef
struct
{
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
vgs
;
// SArray<SMqSubVgEp>
}
SMqSubTopicEp
;
typedef
struct
SMqCMGetSubEpRsp
{
typedef
struct
{
int64_t
consumerId
;
int64_t
epoch
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
...
...
include/common/tmsgdef.h
浏览文件 @
432f8bfd
...
...
@@ -141,7 +141,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_TOPIC
,
"mnode-drop-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SUBSCRIBE
,
"mnode-subscribe"
,
SCMSubscribeReq
,
SCMSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_SUB_EP
,
"mnode-get-sub-ep"
,
SMqCMGetSubEpReq
,
SMqCMGetSubEpRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_TIMER
,
"mnode-timer"
,
SMqTmrMsg
,
SMqTmrMsg
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_TIMER
,
"mnode-mq-timer"
,
SMqTmrMsg
,
SMqTmrMsg
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_DO_REBALANCE
,
"mnode-mq-do-rebalance"
,
SMqDoRebalanceMsg
,
SMqDoRebalanceMsg
)
// Requests handled by VNODE
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
...
...
include/dnode/mnode/mnode.h
浏览文件 @
432f8bfd
...
...
@@ -27,6 +27,7 @@ typedef struct SMnodeMsg SMnodeMsg;
typedef
int32_t
(
*
SendReqToDnodeFp
)(
SDnode
*
pDnode
,
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
typedef
int32_t
(
*
SendReqToMnodeFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
int32_t
(
*
PutReqToMWriteQFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
int32_t
(
*
PutReqToMReadQFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
void
(
*
SendRedirectRspFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
struct
SMnodeLoad
{
...
...
@@ -64,6 +65,7 @@ typedef struct {
SMnodeCfg
cfg
;
SDnode
*
pDnode
;
PutReqToMWriteQFp
putReqToMWriteQFp
;
PutReqToMReadQFp
putReqToMReadQFp
;
SendReqToDnodeFp
sendReqToDnodeFp
;
SendReqToMnodeFp
sendReqToMnodeFp
;
SendRedirectRspFp
sendRedirectRspFp
;
...
...
source/client/src/tmq.c
浏览文件 @
432f8bfd
...
...
@@ -209,6 +209,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SName
name
=
{
0
};
char
*
dbName
=
getDbOfConnection
(
tmq
->
pTscObj
);
if
(
dbName
==
NULL
)
{
return
TMQ_RESP_ERR__FAIL
;
}
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
dbName
,
strlen
(
dbName
));
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
...
...
source/client/test/clientTests.cpp
浏览文件 @
432f8bfd
...
...
@@ -565,7 +565,6 @@ TEST(testCase, insert_test) {
#endif
#if 0
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
@@ -585,7 +584,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result
(
pRes
);
for(int32_t i = 0; i < 100000; ++i) {
for
(
int32_t
i
=
0
;
i
<
100000
00
;
++
i
)
{
char
sql
[
512
]
=
{
0
};
sprintf
(
sql
,
"insert into tu values(now+%da, %d)"
,
i
,
i
);
TAOS_RES
*
p
=
taos_query
(
pConn
,
sql
);
...
...
@@ -616,6 +615,7 @@ TEST(testCase, projection_query_tables) {
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#if 0
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
...
source/client/test/tmqTest.cpp
浏览文件 @
432f8bfd
...
...
@@ -46,7 +46,6 @@ TEST(testCase, create_topic_ctb_Test) {
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
//taos_free_result(pRes);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
...
...
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
432f8bfd
...
...
@@ -256,6 +256,12 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
static
int32_t
dndPutMsgToMWriteQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
dndWriteMnodeMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pRpcMsg
);
return
0
;
}
static
int32_t
dndPutMsgToMReadQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
dndWriteMnodeMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pRpcMsg
);
return
0
;
}
static
void
dndInitMnodeOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
...
...
@@ -264,6 +270,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
sendReqToMnodeFp
=
dndSendReqToMnode
;
pOption
->
sendRedirectRspFp
=
dndSendRedirectRsp
;
pOption
->
putReqToMWriteQFp
=
dndPutMsgToMWriteQ
;
pOption
->
putReqToMReadQFp
=
dndPutMsgToMReadQ
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
env
.
sver
;
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
432f8bfd
...
...
@@ -28,6 +28,8 @@ void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj
*
mndAcquireConsumer
(
SMnode
*
pMnode
,
int64_t
consumerId
);
void
mndReleaseConsumer
(
SMnode
*
pMnode
,
SMqConsumerObj
*
pConsumer
);
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
432f8bfd
...
...
@@ -341,42 +341,22 @@ typedef struct {
char
payload
[];
}
SShowObj
;
#if 0
typedef struct SConsumerObj {
uint64_t uid;
int64_t createTime;
int64_t updateTime;
//uint64_t dbUid;
int32_t version;
SRWLatch lock;
SArray* topics;
} SConsumerObj;
typedef struct SMqTopicConsumer {
int64_t consumerId;
SList* topicList;
} SMqTopicConsumer;
#endif
typedef
struct
SMqConsumerEp
{
typedef
struct
{
int32_t
vgId
;
// -1 for unassigned
int32_t
status
;
SEpSet
epSet
;
int64_t
oldConsumerId
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
char
*
qmsg
;
}
SMqConsumerEp
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
const
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
status
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
lastConsumerHbTs
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
lastVgHbTs
);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen
+=
taosEncodeString
(
buf
,
pConsumerEp
->
qmsg
);
return
tlen
;
}
...
...
@@ -385,10 +365,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
status
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
lastConsumerHbTs
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
lastVgHbTs
);
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf
=
taosDecodeString
(
buf
,
&
pConsumerEp
->
qmsg
);
return
buf
;
}
...
...
@@ -399,97 +377,89 @@ static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
}
}
// unit for rebalance
typedef
struct
SMqSubscribeObj
{
typedef
struct
{
int64_t
consumerId
;
SArray
*
vgInfo
;
// SArray<SMqConsumerEp>
}
SMqSubConsumer
;
static
FORCE_INLINE
int32_t
tEncodeSMqSubConsumer
(
void
**
buf
,
const
SMqSubConsumer
*
pConsumer
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
vgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pConsumer
->
vgInfo
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqSubConsumer
(
void
**
buf
,
SMqSubConsumer
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
vgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
consumerEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
consumerEp
);
taosArrayPush
(
pConsumer
->
vgInfo
,
&
consumerEp
);
}
return
buf
;
}
static
FORCE_INLINE
void
tDeleteSMqSubConsumer
(
SMqSubConsumer
*
pSubConsumer
)
{
if
(
pSubConsumer
->
vgInfo
)
{
taosArrayDestroyEx
(
pSubConsumer
->
vgInfo
,
(
void
(
*
)(
void
*
))
tDeleteSMqConsumerEp
);
pSubConsumer
->
vgInfo
=
NULL
;
}
}
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
int32_t
epoch
;
// TODO: replace with priority queue
int32_t
nextConsumerIdx
;
SArray
*
availConsumer
;
// SArray<int64_t> (consumerId)
SArray
*
assigned
;
// SArray<SMqConsumerEp>
SArray
*
idleConsumer
;
// SArray<SMqConsumerEp>
SArray
*
lostConsumer
;
// SArray<SMqConsumerEp>
int32_t
status
;
int32_t
vgNum
;
SArray
*
consumers
;
// SArray<SMqSubConsumer>
SArray
*
unassignedVg
;
// SArray<SMqConsumerEp>
}
SMqSubscribeObj
;
static
FORCE_INLINE
SMqSubscribeObj
*
tNewSubscribeObj
()
{
SMqSubscribeObj
*
pSub
=
malloc
(
sizeof
(
SMqSubscribeObj
));
SMqSubscribeObj
*
pSub
=
calloc
(
1
,
sizeof
(
SMqSubscribeObj
));
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
key
[
0
]
=
0
;
pSub
->
epoch
=
0
;
pSub
->
availConsumer
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pSub
->
availConsumer
==
NULL
)
{
free
(
pSub
);
return
NULL
;
}
pSub
->
assigned
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
free
(
pSub
);
return
NULL
;
}
pSub
->
lostConsumer
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
lostConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
free
(
pSub
);
return
NULL
;
}
pSub
->
idleConsumer
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
idleConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
lostConsumer
);
free
(
pSub
);
return
NULL
;
pSub
->
consumers
=
taosArrayInit
(
0
,
sizeof
(
SMqSubConsumer
));
if
(
pSub
->
consumers
==
NULL
)
{
goto
_err
;
}
pSub
->
unassignedVg
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
lostConsumer
);
taosArrayDestroy
(
pSub
->
idleConsumer
);
free
(
pSub
);
return
NULL
;
goto
_err
;
}
pSub
->
key
[
0
]
=
0
;
pSub
->
vgNum
=
0
;
pSub
->
status
=
0
;
return
pSub
;
_err:
tfree
(
pSub
->
unassignedVg
);
tfree
(
pSub
->
consumers
);
tfree
(
pSub
);
return
NULL
;
}
static
FORCE_INLINE
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pSub
->
key
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
epoch
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
vgNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
status
);
int32_t
sz
;
sz
=
taosArrayGetSize
(
pSub
->
availConsumer
);
sz
=
taosArrayGetSize
(
pSub
->
consumers
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
*
pConsumerId
=
taosArrayGet
(
pSub
->
availConsumer
,
i
);
tlen
+=
taosEncodeFixedI64
(
buf
,
*
pConsumerId
);
}
sz
=
taosArrayGetSize
(
pSub
->
assigned
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
lostConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
lostConsumer
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
idleConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
idleConsumer
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
i
);
tlen
+=
tEncodeSMqSubConsumer
(
buf
,
pSubConsumer
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
...
...
@@ -504,68 +474,25 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
static
FORCE_INLINE
void
*
tDecodeSubscribeObj
(
void
*
buf
,
SMqSubscribeObj
*
pSub
)
{
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
epoch
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
vgNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
status
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
availConsumer
=
taosArrayInit
(
sz
,
sizeof
(
int64_t
));
if
(
pSub
->
availConsumer
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
;
buf
=
taosDecodeFixedI64
(
buf
,
&
consumerId
);
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
assigned
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
=
{
0
};
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
assigned
,
&
cEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
lostConsumer
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
lostConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
pSub
->
consumers
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubConsumer
));
if
(
pSub
->
consumers
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMq
ConsumerEp
cEp
=
{
0
};
buf
=
tDecodeSMq
ConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
lostConsumer
,
&
cEp
);
SMq
SubConsumer
subConsumer
=
{
0
};
buf
=
tDecodeSMq
SubConsumer
(
buf
,
&
subConsumer
);
taosArrayPush
(
pSub
->
consumers
,
&
subConsumer
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
idleConsumer
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
idleConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
lostConsumer
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
=
{
0
};
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
idleConsumer
,
&
cEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedVg
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
lostConsumer
);
taosArrayDestroy
(
pSub
->
idleConsumer
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -573,50 +500,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
unassignedVg
,
&
cEp
);
}
return
buf
;
}
static
FORCE_INLINE
void
tDeleteSMqSubscribeObj
(
SMqSubscribeObj
*
pSub
)
{
if
(
pSub
->
availConsumer
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
pSub
->
availConsumer
=
NULL
;
}
if
(
pSub
->
assigned
)
{
//taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy
(
pSub
->
assigned
);
pSub
->
assigned
=
NULL
;
if
(
pSub
->
consumers
)
{
taosArrayDestroyEx
(
pSub
->
consumers
,
(
void
(
*
)(
void
*
))
tDeleteSMqSubConsumer
);
//taosArrayDestroy(pSub->consumers);
pSub
->
consumers
=
NULL
;
}
if
(
pSub
->
unassignedVg
)
{
//
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy
(
pSub
->
unassignedVg
);
taosArrayDestroyEx
(
pSub
->
unassignedVg
,
(
void
(
*
)(
void
*
))
tDeleteSMqConsumerEp
);
//
taosArrayDestroy(pSub->unassignedVg);
pSub
->
unassignedVg
=
NULL
;
}
if
(
pSub
->
idleConsumer
)
{
//taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy
(
pSub
->
idleConsumer
);
pSub
->
idleConsumer
=
NULL
;
}
if
(
pSub
->
lostConsumer
)
{
//taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy
(
pSub
->
lostConsumer
);
pSub
->
lostConsumer
=
NULL
;
}
}
typedef
struct
SMqCGroup
{
char
name
[
TSDB_CONSUMER_GROUP_LEN
];
int32_t
status
;
// 0 - uninitialized, 1 - wait rebalance, 2- normal
SList
*
consumerIds
;
// SList<int64_t>
SList
*
idleVGroups
;
// SList<int32_t>
}
SMqCGroup
;
typedef
struct
SMqTopicObj
{
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
uint64_t
uid
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
SRWLatch
lock
;
...
...
@@ -624,79 +530,23 @@ typedef struct SMqTopicObj {
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
// SHashObj *cgroups; // SHashObj<SMqCGroup>
// SHashObj *consumers; // SHashObj<SMqConsumerObj>
}
SMqTopicObj
;
// TODO: add cache and change name to id
typedef
struct
SMqConsumerTopic
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int32_t
epoch
;
// vg assigned to the consumer on the topic
SArray
*
pVgInfo
;
// SArray<int32_t>
}
SMqConsumerTopic
;
static
FORCE_INLINE
SMqConsumerTopic
*
tNewConsumerTopic
(
int64_t
consumerId
,
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
,
int64_t
*
oldConsumerId
)
{
SMqConsumerTopic
*
pCTopic
=
malloc
(
sizeof
(
SMqConsumerTopic
));
if
(
pCTopic
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
strcpy
(
pCTopic
->
name
,
pTopic
->
name
);
pCTopic
->
epoch
=
0
;
pCTopic
->
pVgInfo
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
int32_t
unassignedVgSz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
if
(
unassignedVgSz
>
0
)
{
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
*
oldConsumerId
=
pCEp
->
consumerId
;
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pCTopic
->
pVgInfo
,
&
pCEp
->
vgId
);
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
}
return
pCTopic
;
}
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerTopic
(
void
**
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pConsumerTopic
->
name
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerTopic
->
epoch
);
int32_t
sz
=
0
;
if
(
pConsumerTopic
->
pVgInfo
!=
NULL
)
{
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
}
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
*
pVgInfo
=
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
*
pVgInfo
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerTopic
(
void
*
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
buf
=
taosDecodeStringTo
(
buf
,
pConsumerTopic
->
name
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerTopic
->
epoch
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumerTopic
->
pVgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgInfo
;
buf
=
taosDecodeFixedI32
(
buf
,
&
vgInfo
);
taosArrayPush
(
pConsumerTopic
->
pVgInfo
,
&
vgInfo
);
}
return
buf
;
}
typedef
struct
SMqConsumerObj
{
typedef
struct
{
int64_t
consumerId
;
int64_t
connId
;
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<
SMqConsumerTopic
>
SArray
*
topics
;
// SArray<
char*
>
int64_t
epoch
;
// stat
int64_t
pollCnt
;
// status
int32_t
status
;
// heartbeat from the consumer reset hbStatus to 0
// each checkConsumerAlive msg add hbStatus by 1
// if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost
int32_t
hbStatus
;
}
SMqConsumerObj
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
...
...
@@ -709,88 +559,29 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
*
pConsumerTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
tlen
+=
t
EncodeSMqConsumerTopic
(
buf
,
pConsumerT
opic
);
char
*
topic
=
taosArrayGetP
(
pConsumer
->
topics
,
i
);
tlen
+=
t
aosEncodeString
(
buf
,
t
opic
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
connId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
pollCnt
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerObj
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
cT
opic
;
buf
=
t
DecodeSMqConsumerTopic
(
buf
,
&
cT
opic
);
taosArrayPush
(
pConsumer
->
topics
,
&
cT
opic
);
char
*
t
opic
;
buf
=
t
aosDecodeString
(
buf
,
&
t
opic
);
taosArrayPush
(
pConsumer
->
topics
,
&
t
opic
);
}
return
buf
;
}
typedef
struct
SMqSubConsumerObj
{
int64_t
consumerUid
;
// if -1, unassigned
SList
*
vgId
;
// SList<int32_t>
}
SMqSubConsumerObj
;
typedef
struct
SMqSubCGroupObj
{
char
name
[
TSDB_CONSUMER_GROUP_LEN
];
SList
*
consumers
;
// SList<SMqConsumerObj>
}
SMqSubCGroupObj
;
typedef
struct
SMqSubTopicObj
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
SRWLatch
lock
;
int32_t
sqlLen
;
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SList
*
cgroups
;
// SList<SMqSubCGroupObj>
}
SMqSubTopicObj
;
typedef
struct
SMqConsumerSubObj
{
int64_t
topicUid
;
SList
*
vgIds
;
// SList<int64_t>
}
SMqConsumerSubObj
;
typedef
struct
SMqConsumerHbObj
{
int64_t
consumerId
;
SList
*
consumerSubs
;
// SList<SMqConsumerSubObj>
}
SMqConsumerHbObj
;
typedef
struct
SMqVGroupSubObj
{
int64_t
topicUid
;
SList
*
consumerIds
;
// SList<int64_t>
}
SMqVGroupSubObj
;
typedef
struct
SMqVGroupHbObj
{
int64_t
vgId
;
SList
*
vgSubs
;
// SList<SMqVGroupSubObj>
}
SMqVGroupHbObj
;
#if 0
typedef struct SCGroupObj {
char name[TSDB_TOPIC_NAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
//uint64_t dbUid;
int32_t version;
SRWLatch lock;
SList* consumerIds;
} SCGroupObj;
#endif
typedef
struct
SMnodeMsg
{
char
user
[
TSDB_USER_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
432f8bfd
...
...
@@ -96,6 +96,7 @@ typedef struct SMnode {
SendReqToMnodeFp
sendReqToMnodeFp
;
SendRedirectRspFp
sendRedirectRspFp
;
PutReqToMWriteQFp
putReqToMWriteQFp
;
PutReqToMReadQFp
putReqToMReadQFp
;
}
SMnode
;
int32_t
mndSendReqToDnode
(
SMnode
*
pMnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
rpcMsg
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
432f8bfd
...
...
@@ -53,6 +53,19 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
)
{
SMqConsumerObj
*
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
cgroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
return
pConsumer
;
}
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
...
...
@@ -164,148 +177,3 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pConsumer
);
}
#if 0
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
return 0;
}
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfConsumers = 0;
void *pIter = NULL;
while (1) {
SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
numOfConsumers++;
sdbRelease(pSdb, pConsumer);
}
*pNumOfConsumers = numOfConsumers;
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
#endif
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
432f8bfd
...
...
@@ -31,9 +31,20 @@
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_
MS 5000
#define MND_SUBSCRIBE_REBALANCE_
CNT 3
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
);
enum
{
MQ_CONSUMER_STATUS__INIT
=
1
,
MQ_CONSUMER_STATUS__ACTIVE
,
MQ_CONSUMER_STATUS__LOST
,
};
enum
{
MQ_SUBSCRIBE_STATUS__ACTIVE
=
1
,
MQ_SUBSCRIBE_STATUS__DELETED
,
};
static
char
*
mndMakeSubscribeKey
(
const
char
*
cgroup
,
const
char
*
topicName
);
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
static
SSdbRow
*
mndSubActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -48,9 +59,10 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static
int32_t
mndProcessMqTimerMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessGetSubEpReq
(
SMnodeMsg
*
pMsg
);
static
int
mndBuildMqSetConsumerVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pSub
,
int64_t
oldConsumerId
);
static
int
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pSub
);
static
int
mndInitUnassignedVg
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
...
...
@@ -68,12 +80,140 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
static
SMqSubscribeObj
*
mndCreateSubscription
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
const
char
*
consumerGroup
)
{
SMqSubscribeObj
*
pSub
=
tNewSubscribeObj
();
if
(
pSub
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
pTopic
->
name
);
if
(
key
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tDeleteSMqSubscribeObj
(
pSub
);
free
(
pSub
);
return
NULL
;
}
strcpy
(
pSub
->
key
,
key
);
free
(
key
);
if
(
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
)
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tDeleteSMqSubscribeObj
(
pSub
);
free
(
pSub
);
return
NULL
;
}
// TODO: disable alter subscribed table
return
pSub
;
}
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqTopicObj
*
pTopic
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
cgroup
)
{
SMqSetCVgReq
req
=
{
0
};
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
pTopic
->
name
);
req
.
sql
=
pTopic
->
sql
;
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
req
.
qmsg
=
pConsumerEp
->
qmsg
;
req
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
;
req
.
newConsumerId
=
pConsumerEp
->
consumerId
;
req
.
vgId
=
pConsumerEp
->
vgId
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pConsumerEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
*
pBuf
=
buf
;
*
pLen
=
tlen
;
return
0
;
}
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
cgroup
)
{
int32_t
vgId
=
pConsumerEp
->
vgId
;
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildRebalanceMsg
(
&
buf
,
&
tlen
,
pTopic
,
pConsumerEp
,
cgroup
)
<
0
)
{
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
buf
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
)
{
SMqSetCVgReq
req
=
{
0
};
req
.
oldConsumerId
=
pConsumerEp
->
consumerId
;
req
.
newConsumerId
=
-
1
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pConsumerEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
*
pBuf
=
buf
;
*
pLen
=
tlen
;
return
0
;
}
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
vgId
=
pConsumerEp
->
vgId
;
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildCancelConnReq
(
&
buf
,
&
tlen
,
pConsumerEp
)
<
0
)
{
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
buf
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndProcessGetSubEpReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqCMGetSubEpRsp
rsp
=
{
0
};
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int64_t
currentTs
=
taosGetTimestampMs
();
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMsg
->
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
...
...
@@ -85,49 +225,35 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
rsp
.
consumerId
=
consumerId
;
rsp
.
epoch
=
pConsumer
->
epoch
;
if
(
pReq
->
epoch
!=
rsp
.
epoch
)
{
SArray
*
pTopics
=
pConsumer
->
topics
;
int32_t
sz
=
taosArrayGetSize
(
pTopics
);
int
sz
=
taosArrayGetSize
(
pTopics
);
rsp
.
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqSubTopicEp
topicEp
;
SMqConsumerTopic
*
pConsumerTopic
=
taosArrayGet
(
pTopics
,
i
);
strcpy
(
topicEp
.
topic
,
pConsumerTopic
->
name
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
pConsumerTopic
->
name
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topicName
=
taosArrayGetP
(
pTopics
,
i
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
topicName
);
ASSERT
(
pSub
);
bool
found
=
0
;
bool
changed
=
0
;
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
availConsumer
);
j
++
)
{
if
(
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
j
)
==
consumerId
)
{
found
=
1
;
break
;
}
}
if
(
found
==
0
)
{
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
}
int
csz
=
taosArrayGetSize
(
pSub
->
consumers
);
//TODO: change to bsearch
for
(
int
j
=
0
;
j
<
csz
;
j
++
)
{
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
j
);
if
(
consumerId
==
pSubConsumer
->
consumerId
)
{
int
vgsz
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
SMqSubTopicEp
topicEp
;
topicEp
.
vgs
=
taosArrayInit
(
vgsz
,
sizeof
(
SMqSubVgEp
));
for
(
int
k
=
0
;
k
<
vgsz
;
k
++
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayGet
(
pSubConsumer
->
vgInfo
,
k
);
int32_t
assignedSz
=
taosArrayGetSize
(
pSub
->
assigned
);
topicEp
.
vgs
=
taosArrayInit
(
assignedSz
,
sizeof
(
SMqSubVgEp
));
for
(
int32_t
j
=
0
;
j
<
assignedSz
;
j
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
j
);
if
(
pCEp
->
consumerId
==
consumerId
)
{
pCEp
->
lastConsumerHbTs
=
currentTs
;
SMqSubVgEp
vgEp
=
{.
epSet
=
pCEp
->
epSet
,
.
vgId
=
pCEp
->
vgId
};
SMqSubVgEp
vgEp
=
{.
epSet
=
pConsumerEp
->
epSet
,
.
vgId
=
pConsumerEp
->
vgId
};
taosArrayPush
(
topicEp
.
vgs
,
&
vgEp
);
changed
=
1
;
}
}
if
(
taosArrayGetSize
(
topicEp
.
vgs
)
!=
0
)
{
taosArrayPush
(
rsp
.
topics
,
&
topicEp
);
break
;
}
if
(
changed
||
found
)
{
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
}
int32_t
tlen
=
tEncodeSMqCMGetSubEpRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
...
...
@@ -157,17 +283,131 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
static
int32_t
mndProcessMqTimerMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMqSubscribeObj
*
pSub
=
NULL
;
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
int64_t
currentTs
=
taosGetTimestampMs
();
int32_t
sz
;
while
(
pIter
!=
NULL
)
{
SMqConsumerObj
*
pConsumer
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pIter
==
NULL
)
break
;
int32_t
hbStatus
=
atomic_fetch_add_32
(
&
pConsumer
->
hbStatus
,
1
);
if
(
hbStatus
>
MND_SUBSCRIBE_REBALANCE_CNT
)
{
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pConsumer
->
status
,
MQ_CONSUMER_STATUS__ACTIVE
,
MQ_CONSUMER_STATUS__LOST
);
if
(
old
==
MQ_CONSUMER_STATUS__ACTIVE
)
{
SMqDoRebalanceMsg
*
pRebMsg
=
rpcMallocCont
(
sizeof
(
SMqDoRebalanceMsg
));
pRebMsg
->
consumerId
=
pConsumer
->
consumerId
;
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_MND_MQ_DO_REBALANCE
,
.
pCont
=
pRebMsg
,
.
contLen
=
sizeof
(
SMqDoRebalanceMsg
)};
pMnode
->
putReqToMWriteQFp
(
pMnode
->
pDnode
,
&
rpcMsg
);
}
}
}
return
0
;
}
static
int32_t
mndProcessDoRebalanceMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMqDoRebalanceMsg
*
pReq
=
(
SMqDoRebalanceMsg
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
pReq
->
consumerId
);
int
topicSz
=
taosArrayGetSize
(
pConsumer
->
topics
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
for
(
int
i
=
0
;
i
<
topicSz
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
pConsumer
->
topics
,
i
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
topic
);
int32_t
consumerNum
=
taosArrayGetSize
(
pSub
->
consumers
);
if
(
consumerNum
!=
0
)
{
int32_t
vgNum
=
pSub
->
vgNum
;
int32_t
vgEachConsumer
=
vgNum
/
consumerNum
;
int32_t
left
=
vgNum
%
consumerNum
;
int32_t
leftUsed
=
0
;
SArray
*
unassignedVgStash
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
SArray
*
unassignedConsumer
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
for
(
int32_t
j
=
0
;
j
<
consumerNum
;
j
++
)
{
bool
changed
=
false
;
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
j
);
int32_t
vgOneConsumer
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
bool
canUseLeft
=
leftUsed
<
left
;
if
(
vgOneConsumer
>
vgEachConsumer
+
canUseLeft
)
{
changed
=
true
;
if
(
canUseLeft
)
leftUsed
++
;
// put into unassigned
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
>
vgEachConsumer
+
canUseLeft
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
pSubConsumer
->
vgInfo
);
ASSERT
(
pConsumerEp
!=
NULL
);
taosArrayPush
(
unassignedVgStash
,
pConsumerEp
);
// build msg and persist into trans
}
}
else
if
(
vgOneConsumer
<
vgEachConsumer
)
{
changed
=
true
;
// assign from unassigned
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
<
vgEachConsumer
)
{
// if no unassgined, save j
if
(
taosArrayGetSize
(
unassignedVgStash
)
==
0
)
{
taosArrayPush
(
unassignedConsumer
,
&
j
);
break
;
}
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
unassignedVgStash
);
ASSERT
(
pConsumerEp
!=
NULL
);
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
// build msg and persist into trans
}
}
if
(
changed
)
{
SMqConsumerObj
*
pRebConsumer
=
mndAcquireConsumer
(
pMnode
,
pSubConsumer
->
consumerId
);
pRebConsumer
->
epoch
++
;
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pRebConsumer
);
sdbSetRawStatus
(
pRebConsumer
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pConsumerRaw
);
}
}
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
unassignedConsumer
);
j
++
)
{
int32_t
consumerIdx
=
*
(
int32_t
*
)
taosArrayGet
(
unassignedConsumer
,
j
);
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
consumerIdx
);
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
<
vgEachConsumer
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
unassignedVgStash
);
ASSERT
(
pConsumerEp
!=
NULL
);
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
// build msg and persist into trans
}
}
ASSERT
(
taosArrayGetSize
(
unassignedVgStash
)
==
0
);
// send msg to vnode
// log rebalance statistics
SSdbRaw
*
pSubRaw
=
mndSubscribeActionEncode
(
pSub
);
sdbSetRawStatus
(
pSubRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pSubRaw
);
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"mq-rebalance-trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
0
;
}
#if 0
//update consumer status for the subscribption
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId;
if
(
pCEp
->
lastConsumerHbTs
!=
-
1
&&
currentTs
-
pCEp
->
lastConsumerHbTs
>
MND_SUBSCRIBE_REBALANCE_MS
)
{
if (pCEp->status != -1) {
int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
continue;
}
// put consumer into lostConsumer
taosArrayPush
(
pSub
->
lostConsumer
,
pCEp
);
SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
lostConsumer->qmsg = NULL;
// put vg into unassigned
taosArrayPush(pSub->unassignedVg, pCEp);
// remove from assigned
...
...
@@ -192,91 +432,76 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
#endif
}
}
if
((
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
))
>
0
&&
taosArrayGetSize
(
pSub
->
availConsumer
)
>
0
)
{
// no available consumer, skip rebalance
if (taosArrayGetSize(pSub->availConsumer) == 0) {
continue;
}
taosArrayGet(pSub->availConsumer, 0);
// rebalance condition1 : have unassigned vg
// assign vg to a consumer, trying to find the least assigned one
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
char *topic = NULL;
char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
// create trans
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
int64_t
oldConsumerId
=
pCEp
->
consumerId
;
pCEp->
oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
pSub
->
nextConsumerIdx
=
(
pSub
->
nextConsumerIdx
+
1
)
%
taosArrayGetSize
(
pSub
->
availConsumer
);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/
SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
sdbWrite(pMnode->pSdb, pConsumerRaw);
mndReleaseConsumer(pMnode, pConsumer);
// build msg
SMqSetCVgReq
req
=
{
0
};
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
topic
);
req
.
sql
=
pTopic
->
sql
;
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
req
.
qmsg
=
pCEp
->
qmsg
;
req
.
oldConsumerId
=
oldConsumerId
;
req
.
newConsumerId
=
consumerId
;
req
.
vgId
=
pCEp
->
vgId
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pCEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
void* msg;
int32_t msgLen;
mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
// persist msg
// TODO: no need for txn
STransAction action = {0};
action.epSet = pCEp->epSet;
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tl
en
;
action.pCont =
msg
;
action.contLen = sizeof(SMsgHead) +
msgL
en;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action);
// persist
raw
// persist
data
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
tfree
(
topic
);
tfree
(
cgroup
);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
}
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
tfree(topic);
tfree(cgroup);
}
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
pIter
,
(
void
**
)
&
pSub
);
// rebalance condition2 : imbalance assignment
}
return 0;
}
#endif
static
int
mndInitUnassignedVg
(
SMnode
*
pMnode
,
SMqTopicObj
*
pTopic
,
SArray
*
unassignedVg
)
{
// convert phyplan to dag
static
int
mndInitUnassignedVg
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SQueryDag
*
pDag
=
qStringToDag
(
pTopic
->
physicalPlan
);
SArray
*
pArray
=
NULL
;
SArray
*
inner
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
SSubplan
*
plan
=
taosArrayGetP
(
inner
,
0
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SArray
*
unassignedVg
=
pSub
->
unassignedVg
;
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
@@ -284,6 +509,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pTopic
->
dbUid
)
continue
;
pSub
->
vgNum
++
;
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epset
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
...
...
@@ -298,47 +524,41 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
return
-
1
;
}
SMqConsumerEp
CEp
=
{
0
};
CEp
.
status
=
0
;
CEp
.
consumerId
=
-
1
;
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
SMqConsumerEp
consumerEp
=
{
0
};
consumerEp
.
status
=
0
;
consumerEp
.
consumerId
=
-
1
;
STaskInfo
*
pTaskInfo
=
taosArrayGet
(
pArray
,
0
);
C
Ep
.
epSet
=
pTaskInfo
->
addr
.
epset
;
C
Ep
.
vgId
=
pTaskInfo
->
addr
.
nodeId
;
consumer
Ep
.
epSet
=
pTaskInfo
->
addr
.
epset
;
consumer
Ep
.
vgId
=
pTaskInfo
->
addr
.
nodeId
;
ASSERT
(
C
Ep
.
vgId
==
pVgroup
->
vgId
);
C
Ep
.
qmsg
=
strdup
(
pTaskInfo
->
msg
->
msg
);
taosArrayPush
(
unassignedVg
,
&
C
Ep
);
ASSERT
(
consumer
Ep
.
vgId
==
pVgroup
->
vgId
);
consumer
Ep
.
qmsg
=
strdup
(
pTaskInfo
->
msg
->
msg
);
taosArrayPush
(
unassignedVg
,
&
consumer
Ep
);
// TODO: free taskInfo
taosArrayDestroy
(
pArray
);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
}
/*qDestroyQueryDag(pDag);*/
return
0
;
}
static
int
mndBuildMqSetConsumerVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pCEp
,
int64_t
oldConsumerId
)
{
int32_t
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
static
int
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
vgId
=
pConsumerEp
->
vgId
;
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
SMqSetCVgReq
req
=
{
.
vgId
=
vgId
,
.
oldConsumerId
=
oldConsumerId
,
.
newConsumerId
=
pConsumer
->
consumerId
,
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
,
.
newConsumerId
=
pConsumerEp
->
consumerId
,
.
sql
=
pTopic
->
sql
,
.
logicalPlan
=
pTopic
->
logicalPlan
,
.
physicalPlan
=
pTopic
->
physicalPlan
,
.
qmsg
=
pConsumerEp
->
qmsg
,
};
strcpy
(
req
.
cgroup
,
pConsumer
->
cgroup
);
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
pTopic
->
name
);
req
.
sql
=
pTopic
->
sql
;
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
req
.
qmsg
=
pCEp
->
qmsg
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
...
...
@@ -365,7 +585,6 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
free
(
buf
);
return
-
1
;
}
}
return
0
;
}
...
...
@@ -373,7 +592,7 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
pSub
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
void
*
buf
=
NULL
;
int32_t
tlen
=
tEncodeSubscribeObj
(
NULL
,
pSub
);
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_SUBSCRIBE_RESERVE_SIZE
;
...
...
@@ -408,7 +627,7 @@ SUB_ENCODE_OVER:
static
SSdbRow
*
mndSubActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
void
*
buf
=
NULL
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
SUB_DECODE_OVER
;
...
...
@@ -443,7 +662,6 @@ SUB_DECODE_OVER:
tfree
(
buf
);
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"subscribe:%s, failed to decode from raw:%p since %s"
,
pSub
->
key
,
pRaw
,
terrstr
());
// TODO free subscribeobj
tfree
(
pRow
);
return
NULL
;
}
...
...
@@ -467,7 +685,7 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return
0
;
}
static
char
*
mndMakeSubscribeKey
(
c
har
*
cgroup
,
char
*
topicName
)
{
static
char
*
mndMakeSubscribeKey
(
c
onst
char
*
cgroup
,
const
char
*
topicName
)
{
char
*
key
=
malloc
(
TSDB_SHOW_SUBQUERY_LEN
);
if
(
key
==
NULL
)
{
return
NULL
;
...
...
@@ -501,8 +719,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SCMSubscribeReq
subscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
&
subscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
char
*
consumerGroup
=
subscribe
.
consumerGroup
;
int32_t
cgroupLen
=
strlen
(
consumerGroup
);
char
*
cgroup
=
subscribe
.
consumerGroup
;
SArray
*
newSub
=
subscribe
.
topicNames
;
int
newTopicNum
=
subscribe
.
topicNum
;
...
...
@@ -511,24 +728,18 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SArray
*
oldSub
=
NULL
;
int
oldTopicNum
=
0
;
bool
createConsumer
=
false
;
// create consumer if not exist
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
// create consumer
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
pConsumer
=
mndCreateConsumer
(
consumerId
,
cgroup
);
createConsumer
=
true
;
}
else
{
pConsumer
->
epoch
++
;
oldSub
=
pConsumer
->
topics
;
}
pConsumer
->
topics
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
))
;
pConsumer
->
topics
=
newSub
;
if
(
oldSub
!=
NULL
)
{
oldTopicNum
=
taosArrayGetSize
(
oldSub
);
...
...
@@ -546,14 +757,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char
*
oldTopicName
=
NULL
;
if
(
i
>=
newTopicNum
)
{
// encode unset topic msg to all vnodes related to that topic
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
oldTopicName
=
taosArrayGetP
(
oldSub
,
j
)
;
j
++
;
}
else
if
(
j
>=
oldTopicNum
)
{
newTopicName
=
taosArrayGetP
(
newSub
,
i
);
i
++
;
}
else
{
newTopicName
=
taosArrayGetP
(
newSub
,
i
);
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
oldTopicName
=
taosArrayGetP
(
oldSub
,
j
)
;
int
comp
=
compareLenPrefixedStr
(
newTopicName
,
oldTopicName
);
if
(
comp
==
0
)
{
...
...
@@ -572,54 +783,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
if
(
oldTopicName
!=
NULL
)
{
#if 0
// cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
ASSERT
(
newTopicName
==
NULL
);
// cancel subscribe of old topic
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
cgroup
,
oldTopicName
);
ASSERT
(
pSub
);
int
csz
=
taosArrayGetSize
(
pSub
->
consumers
);
for
(
int
ci
=
0
;
ci
<
csz
;
ci
++
)
{
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
ci
);
if
(
pSubConsumer
->
consumerId
==
consumerId
)
{
int
vgsz
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
for
(
int
vgi
=
0
;
vgi
<
vgsz
;
vgi
++
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayGet
(
pSubConsumer
->
vgInfo
,
vgi
);
mndPersistCancelConnReq
(
pMnode
,
pTrans
,
pConsumerEp
);
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
break
;
}
}
// delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
#endif
pSub
->
status
=
MQ_SUBSCRIBE_STATUS__DELETED
;
}
else
if
(
newTopicName
!=
NULL
)
{
// save subscribe info to mnode
ASSERT
(
oldTopicName
==
NULL
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
newTopicName
);
...
...
@@ -628,111 +810,53 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
continue
;
}
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
consumerGroup
,
newTopicName
);
bool
create
=
false
;
if
(
pSub
==
NULL
)
{
mDebug
(
"create new subscription, group: %s, topic %s"
,
consumerGroup
,
newTopicName
);
pSub
=
tNewSubscribeObj
();
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
cgroup
,
newTopicName
);
bool
createSub
=
false
;
if
(
pSub
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
newTopicName
);
if
(
key
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pSub
->
key
,
key
);
free
(
key
);
// set unassigned vg
if
(
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
)
<
0
)
{
// TODO: free memory
return
-
1
;
}
// TODO: disable alter
create
=
true
;
mDebug
(
"create new subscription by consumer %ld, group: %s, topic %s"
,
consumerId
,
cgroup
,
newTopicName
);
pSub
=
mndCreateSubscription
(
pMnode
,
pTopic
,
cgroup
);
createSub
=
true
;
}
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
int64_t
oldConsumerId
;
SMqConsumerTopic
*
pConsumerTopic
=
tNewConsumerTopic
(
consumerId
,
pTopic
,
pSub
,
&
oldConsumerId
);
taosArrayPush
(
pConsumer
->
topics
,
pConsumerTopic
);
SMqSubConsumer
mqSubConsumer
;
mqSubConsumer
.
consumerId
=
consumerId
;
mqSubConsumer
.
vgInfo
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
taosArrayPush
(
pSub
->
consumers
,
&
mqSubConsumer
);
if
(
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
)
>
0
)
{
ASSERT
(
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
)
==
1
);
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGetLast
(
pConsumerTopic
->
pVgInfo
);
SMqConsumerEp
*
pCEp
=
taosArrayGetLast
(
pSub
->
assigned
);
if
(
pCEp
->
vgId
==
vgId
)
{
if
(
mndBuildMqSetConsumerVgReq
(
pMnode
,
pTrans
,
pConsumer
,
pConsumerTopic
,
pTopic
,
pCEp
,
oldConsumerId
)
<
0
)
{
// TODO
return
-
1
;
}
}
// send setmsg to vnode
// if have un assigned vg, assign one to the consumer
if
(
taosArrayGetSize
(
pSub
->
unassignedVg
)
>
0
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
consumerId
;
taosArrayPush
(
mqSubConsumer
.
vgInfo
,
pConsumerEp
);
mndPersistMqSetConnReq
(
pMnode
,
pTrans
,
pTopic
,
cgroup
,
pConsumerEp
);
}
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
if
(
!
create
)
mndReleaseSubscribe
(
pMnode
,
pSub
);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
#endif
/*mndReleaseTopic(pMnode, pTopic);*/
/*mndReleaseSubscribe(pMnode, pSub);*/
if
(
!
createSub
)
mndReleaseSubscribe
(
pMnode
,
pSub
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
}
// part3. persist consumerObj
// destroy old sub
if
(
oldSub
)
taosArrayDestroy
(
oldSub
);
// put new sub into consumerobj
if
(
oldSub
)
taosArrayDestroyEx
(
oldSub
,
free
);
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pConsumerRaw
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
if
(
newSub
)
taosArrayDestroy
(
newSub
);
mError
(
"mq-subscribe-trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
/*mndReleaseConsumer(pMnode, pConsumer);*/
if
(
!
createConsumer
)
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
if
(
newSub
)
taosArrayDestroy
(
newSub
);
mndTransDrop
(
pTrans
);
/*mndReleaseConsumer(pMnode, pConsumer);*/
if
(
!
createConsumer
)
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
...
...
@@ -741,146 +865,6 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
return
0
;
}
static
int32_t
mndProcessConsumerMetaMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
STableInfoReq
*
pInfo
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"subscribe:%s, start to retrieve meta"
,
pInfo
->
tableFname
);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return
0
;
}
static
int32_t
mndGetNumOfConsumers
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfConsumers
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
return
-
1
;
}
int32_t
numOfConsumers
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SMqConsumerObj
*
pConsumer
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pIter
==
NULL
)
break
;
numOfConsumers
++
;
sdbRelease
(
pSdb
,
pConsumer
);
}
*
pNumOfConsumers
=
numOfConsumers
;
return
0
;
}
static
int32_t
mndGetConsumerMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
if
(
mndGetNumOfConsumers
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
return
-
1
;
}
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"name"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"columns"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"tags"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_CONSUMER
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
432f8bfd
...
...
@@ -76,7 +76,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
if
(
mndIsMaster
(
pMnode
))
{
SMqTmrMsg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SMqTmrMsg
));
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_MND_MQ_TIMER
,
.
pCont
=
pMsg
,
.
contLen
=
sizeof
(
SMqTmrMsg
)};
pMnode
->
putReqToM
Write
QFp
(
pMnode
->
pDnode
,
&
rpcMsg
);
pMnode
->
putReqToM
Read
QFp
(
pMnode
->
pDnode
,
&
rpcMsg
);
}
taosTmrReset
(
mndCalMqRebalance
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
mqTimer
);
...
...
@@ -249,6 +249,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy
(
&
pMnode
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
pMnode
->
pDnode
=
pOption
->
pDnode
;
pMnode
->
putReqToMWriteQFp
=
pOption
->
putReqToMWriteQFp
;
pMnode
->
putReqToMReadQFp
=
pOption
->
putReqToMReadQFp
;
pMnode
->
sendReqToDnodeFp
=
pOption
->
sendReqToDnodeFp
;
pMnode
->
sendReqToMnodeFp
=
pOption
->
sendReqToMnodeFp
;
pMnode
->
sendRedirectRspFp
=
pOption
->
sendRedirectRspFp
;
...
...
source/dnode/vnode/src/inc/tsdbReadImpl.h
浏览文件 @
432f8bfd
...
...
@@ -124,7 +124,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
432f8bfd
...
...
@@ -1327,7 +1327,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
int
nBlocks
=
pCommith
->
readh
.
pBlkIdx
->
numOfBlocks
;
SBlock
*
pBlock
=
pCommith
->
readh
.
pBlkInfo
->
blocks
+
bidx
;
TSKEY
keyLimit
;
int16_t
colId
=
0
;
int16_t
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
SMergeInfo
mInfo
;
SBlock
subBlocks
[
TSDB_MAX_SUBBLOCKS
];
SBlock
block
,
supBlock
;
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
432f8bfd
...
...
@@ -22,7 +22,7 @@ static void tsdbResetReadFile(SReadH *pReadh);
static
int
tsdbLoadBlockDataImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
);
static
int
tsdbCheckAndDecodeColumnData
(
SDataCol
*
pDataCol
,
void
*
content
,
int32_t
len
,
int8_t
comp
,
int
numOfRows
,
int
maxPoints
,
char
*
buffer
,
int
bufferSize
);
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
const
int16_t
*
colIds
,
int
numOfColIds
);
static
int
tsdbLoadColData
(
SReadH
*
pReadh
,
SDFile
*
pDFile
,
SBlock
*
pBlock
,
SBlockCol
*
pBlockCol
,
SDataCol
*
pDataCol
);
...
...
@@ -271,7 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return
0
;
}
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
int16_t
*
colIds
,
int
numOfColsIds
)
{
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
>
0
);
int8_t
update
=
pReadh
->
pRepo
->
config
.
update
;
...
...
@@ -472,7 +472,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
continue
;
}
int16_t
tcolId
=
0
;
int16_t
tcolId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
uint32_t
toffset
=
TSDB_KEY_COL_OFFSET
;
int32_t
tlen
=
pBlock
->
keyLen
;
...
...
@@ -548,7 +548,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return
0
;
}
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
const
int16_t
*
colIds
,
int
numOfColIds
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
==
0
||
pBlock
->
numOfSubBlocks
==
1
);
ASSERT
(
colIds
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
...
...
source/dnode/vnode/src/vnd/vnodeBufferPool.c
浏览文件 @
432f8bfd
...
...
@@ -185,6 +185,7 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
free
(
pMA
);
if
(
--
pVMA
->
_ref
.
val
==
0
)
{
TD_DLIST_POP
(
&
(
pVnode
->
pBufPool
->
incycle
),
pVMA
);
vmaReset
(
pVMA
);
TD_DLIST_APPEND
(
&
(
pVnode
->
pBufPool
->
free
),
pVMA
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录