Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ddf7dcc9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ddf7dcc9
编写于
1月 18, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix mem leak
上级
b40ccb5f
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
73 addition
and
27 deletion
+73
-27
include/common/tmsg.h
include/common/tmsg.h
+7
-4
source/client/src/clientHb.c
source/client/src/clientHb.c
+7
-5
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+16
-2
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+24
-15
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+5
-0
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+11
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
ddf7dcc9
...
...
@@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static
FORCE_INLINE
void
tFreeClientHbReq
(
void
*
pReq
)
{
SClientHbReq
*
req
=
(
SClientHbReq
*
)
pReq
;
taosHashCleanup
(
req
->
info
);
free
(
pReq
);
if
(
req
->
info
)
taosHashCleanup
(
req
->
info
);
}
int
tSerializeSClientHbBatchReq
(
void
**
buf
,
const
SClientHbBatchReq
*
pReq
);
void
*
tDeserializeSClientHbBatchReq
(
void
*
buf
,
SClientHbBatchReq
*
pReq
);
static
FORCE_INLINE
void
tFreeClientHbBatchReq
(
void
*
pReq
)
{
static
FORCE_INLINE
void
tFreeClientHbBatchReq
(
void
*
pReq
,
bool
deep
)
{
SClientHbBatchReq
*
req
=
(
SClientHbBatchReq
*
)
pReq
;
//taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
if
(
deep
)
{
taosArrayDestroyEx
(
req
->
reqs
,
tFreeClientHbReq
);
}
else
{
taosArrayDestroy
(
req
->
reqs
);
}
free
(
pReq
);
}
...
...
source/client/src/clientHb.c
浏览文件 @
ddf7dcc9
...
...
@@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter
=
taosHashIterate
(
pAppHbMgr
->
activeInfo
,
pIter
);
}
#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
getConnInfoFp
(
connKey
,
NULL
);
SArray* pArray =
getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
}
#endif
return
pBatchReq
;
}
...
...
@@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) {
//TODO: error handling
break
;
}
void
*
bufCopy
=
buf
;
tSerializeSClientHbBatchReq
(
&
bufCopy
,
pReq
);
void
*
abuf
=
buf
;
tSerializeSClientHbBatchReq
(
&
abuf
,
pReq
);
SMsgSendInfo
*
pInfo
=
malloc
(
sizeof
(
SMsgSendInfo
));
if
(
pInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tFreeClientHbBatchReq
(
pReq
);
tFreeClientHbBatchReq
(
pReq
,
false
);
free
(
buf
);
break
;
}
...
...
@@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) {
int64_t
transporterId
=
0
;
SEpSet
epSet
=
getEpSet_s
(
&
pAppInstInfo
->
mgmtEp
);
asyncSendMsgToServer
(
pAppInstInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
pInfo
);
tFreeClientHbBatchReq
(
pReq
);
tFreeClientHbBatchReq
(
pReq
,
false
);
atomic_add_fetch_32
(
&
pAppHbMgr
->
reportCnt
,
1
);
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
ddf7dcc9
...
...
@@ -53,7 +53,9 @@ TEST(testCase, connect_Test) {
if
(
pConn
==
NULL
)
{
printf
(
"failed to connect to server, reason:%s
\n
"
,
taos_errstr
(
NULL
));
}
sleep
(
3
);
while
(
1
)
{
sleep
(
3
);
}
taos_close
(
pConn
);
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
ddf7dcc9
...
...
@@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer {
} SMqTopicConsumer;
#endif
typedef
struct
SMqConsumerEp
{
int32_t
vgId
;
SEpSet
epset
;
int64_t
consumerId
;
}
SMqConsumerEp
;
typedef
struct
SMqCgroupTopicPair
{
char
key
[
TSDB_CONSUMER_GROUP_LEN
+
TSDB_TOPIC_FNAME_LEN
];
SArray
*
assigned
;
SArray
*
unassignedConsumer
;
SArray
*
unassignedVg
;
}
SMqCgroupTopicPair
;
typedef
struct
SMqCGroup
{
char
name
[
TSDB_CONSUMER_GROUP_LEN
];
int32_t
status
;
// 0 - uninitialized, 1 - wait rebalance, 2- normal
...
...
@@ -351,10 +364,11 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id
typedef
struct
SMqConsumerTopic
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int32_t
epoch
;
char
name
[
TSDB_TOPIC_NAME_LEN
];
//TODO: replace with something with ep
SList
*
vgroups
;
// SList<int32_t>
SArray
*
pVgInfo
;
// SArray<int32_t>
}
SMqConsumerTopic
;
typedef
struct
SMqConsumerObj
{
...
...
@@ -362,7 +376,7 @@ typedef struct SMqConsumerObj {
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
SHashObj
*
topicHash
;
SHashObj
*
topicHash
;
//SHashObj<SMqConsumerTopic>
}
SMqConsumerObj
;
typedef
struct
SMqSubConsumerObj
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
ddf7dcc9
...
...
@@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMSubscribeReq
*
pS
ubscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
pS
ubscribe
);
int64_t
consumerId
=
pSubscribe
->
consumerId
;
char
*
consumerGroup
=
pSubscribe
->
consumerGroup
;
SCMSubscribeReq
s
ubscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
&
s
ubscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
char
*
consumerGroup
=
subscribe
.
consumerGroup
;
int32_t
cgroupLen
=
strlen
(
consumerGroup
);
SArray
*
newSub
=
NULL
;
int
newTopicNum
=
pSubscribe
->
topicNum
;
int
newTopicNum
=
subscribe
.
topicNum
;
if
(
newTopicNum
)
{
newSub
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
}
SMqConsumerTopic
*
pConsumerTopics
=
calloc
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
if
(
pConsumerTopics
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
char
*
newTopicName
=
taosArrayGetP
(
newSub
,
i
);
SMqConsumerTopic
*
pConsumerTopic
=
malloc
(
sizeof
(
SMqConsumerTopic
));
if
(
pConsumerTopic
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
// TODO: free
return
-
1
;
}
SMqConsumerTopic
*
pConsumerTopic
=
&
pConsumerTopics
[
i
];
strcpy
(
pConsumerTopic
->
name
,
newTopicName
);
pConsumerTopic
->
vgroups
=
tdListNew
(
sizeof
(
int64_t
));
taosArrayPush
(
newSub
,
pConsumerTopic
);
free
(
pConsumerTopic
);
}
taosArrayAddBatch
(
newSub
,
pConsumerTopics
,
newTopicNum
);
free
(
pConsumerTopics
);
taosArraySortString
(
newSub
,
taosArrayCompareString
);
SArray
*
oldSub
=
NULL
;
int
oldTopicNum
=
0
;
// create consumer if not exist
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
// create consumer
...
...
@@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
//TODO: free memory
return
-
1
;
}
...
...
@@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
if
(
pOldTopic
!=
NULL
)
{
//cancel subscribe of that old topic
ASSERT
(
pNewTopic
==
NULL
);
char
*
oldTopicName
=
pOldTopic
->
name
;
SList
*
vgroups
=
pOldTopic
->
vgroups
;
...
...
@@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMqCGroup
*
pGroup
=
taosHashGet
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
int32_t
vgId
=
*
(
int64_t
*
)
pn
->
data
;
// acquire and get epset
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
// TODO
release
// TODO
what time to release?
if
(
pVgObj
==
NULL
)
{
// TODO handle error
continue
;
}
//
acquire and get epset
//
build reset msg
void
*
pMqVgSetReq
=
mndBuildMqVGroupSetReq
(
pMnode
,
oldTopicName
,
vgId
,
consumerId
,
consumerGroup
);
// TODO:serialize
if
(
pMsg
==
NULL
)
{
...
...
@@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return
-
1
;
}
}
//delete data in mnode
taosHashRemove
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
if
(
pNewTopic
!=
NULL
)
{
// save subscribe info to mnode
ASSERT
(
pOldTopic
==
NULL
);
char
*
newTopicName
=
pNewTopic
->
name
;
...
...
@@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// add into cgroups
taosHashPut
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
,
pGroup
,
sizeof
(
SMqCGroup
));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
ddf7dcc9
...
...
@@ -357,6 +357,11 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
}
}
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SClientHbReq
*
pHbReq
=
taosArrayGet
(
pArray
,
i
);
tFreeClientHbReq
(
pHbReq
);
}
taosArrayDestroy
(
pArray
);
int32_t
tlen
=
tSerializeSClientHbBatchRsp
(
NULL
,
&
batchRsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
abuf
=
buf
;
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
ddf7dcc9
...
...
@@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset
(
mndTransReExecute
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
transTimer
);
}
static
void
mndCalMqRebalance
(
void
*
param
,
void
*
tmrId
)
{
SMnode
*
pMnode
=
param
;
if
(
mndIsMaster
(
pMnode
))
{
// iterate cgroup, cal rebalance
// sync with raft
// write sdb
}
taosTmrReset
(
mndCalMqRebalance
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
transTimer
);
}
static
int32_t
mndInitTimer
(
SMnode
*
pMnode
)
{
if
(
pMnode
->
timer
==
NULL
)
{
pMnode
->
timer
=
taosTmrInit
(
5000
,
200
,
3600000
,
"MND"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录