Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7ab261d7
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
7ab261d7
编写于
1月 19, 2022
作者:
L
Liu Jicong
提交者:
GitHub
1月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9898 from taosdata/feature/tq
add subscribe for sdb
上级
b3526ee2
00ccbb4a
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
934 addition
and
259 deletion
+934
-259
include/common/tmsg.h
include/common/tmsg.h
+24
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+2
-2
include/util/tdef.h
include/util/tdef.h
+1
-0
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+3
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+226
-12
source/dnode/mnode/impl/inc/mndSubscribe.h
source/dnode/mnode/impl/inc/mndSubscribe.h
+38
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+35
-232
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+3
-0
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+597
-0
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+3
-11
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+2
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
7ab261d7
...
...
@@ -324,6 +324,30 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return
buf
;
}
typedef
struct
SMqSetCVgReq
{
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
}
SMqSetCVgReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqSetCVgReq
(
void
**
buf
,
const
SMqSetCVgReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topicName
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
cGroup
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqSetCVgReq
(
void
*
buf
,
SMqSetCVgReq
*
pReq
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
topicName
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
cGroup
);
return
buf
;
}
typedef
struct
{
int32_t
vgId
;
char
*
dbName
;
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
7ab261d7
...
...
@@ -113,8 +113,8 @@ typedef enum {
SDB_USER
=
7
,
SDB_AUTH
=
8
,
SDB_ACCT
=
9
,
SDB_
CONSUMER
=
10
,
SDB_C
GROUP
=
11
,
SDB_
SUBSCRIBE
=
10
,
SDB_C
ONSUMER
=
11
,
SDB_TOPIC
=
12
,
SDB_VGROUP
=
13
,
SDB_STB
=
14
,
...
...
include/util/tdef.h
浏览文件 @
7ab261d7
...
...
@@ -178,6 +178,7 @@ do { \
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
7ab261d7
...
...
@@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj
*
mndAcquireConsumer
(
SMnode
*
pMnode
,
int32_t
consumerId
);
void
mndReleaseConsumer
(
SMnode
*
pMnode
,
SMqConsumerObj
*
pConsumer
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
7ab261d7
...
...
@@ -326,17 +326,156 @@ typedef struct SMqTopicConsumer {
#endif
typedef
struct
SMqConsumerEp
{
int32_t
vgId
;
int32_t
vgId
;
// -1 for unassigned
SEpSet
epset
;
int64_t
consumerId
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
}
SMqConsumerEp
;
typedef
struct
SMqCgroupTopicPair
{
char
key
[
TSDB_CONSUMER_GROUP_LEN
+
TSDB_TOPIC_FNAME_LEN
];
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
return
buf
;
}
//unit for rebalance
typedef
struct
SMqSubscribeObj
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
int32_t
epoch
;
//TODO: replace with priority queue
SArray
*
availConsumer
;
// SArray<int64_t> (consumerId)
SArray
*
assigned
;
// SArray<SMqConsumerEp>
SArray
*
unassignedConsumer
;
SArray
*
unassignedVg
;
}
SMqCgroupTopicPair
;
SArray
*
unassignedConsumer
;
// SArray<SMqConsumerEp>
SArray
*
unassignedVg
;
// SArray<SMqConsumerEp>
}
SMqSubscribeObj
;
static
FORCE_INLINE
SMqSubscribeObj
*
tNewSubscribeObj
()
{
SMqSubscribeObj
*
pSub
=
malloc
(
sizeof
(
SMqSubscribeObj
));
pSub
->
key
[
0
]
=
0
;
pSub
->
epoch
=
0
;
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
availConsumer
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pSub
->
availConsumer
==
NULL
)
{
free
(
pSub
);
return
NULL
;
}
pSub
->
assigned
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
free
(
pSub
);
return
NULL
;
}
pSub
->
unassignedConsumer
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
free
(
pSub
);
return
NULL
;
}
pSub
->
unassignedVg
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
taosArrayDestroy
(
pSub
->
unassignedVg
);
free
(
pSub
);
return
NULL
;
}
return
NULL
;
}
static
FORCE_INLINE
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pSub
->
key
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
epoch
);
int32_t
sz
;
sz
=
taosArrayGetSize
(
pSub
->
availConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
*
pConsumerId
=
taosArrayGet
(
pSub
->
availConsumer
,
i
);
tlen
+=
taosEncodeFixedI64
(
buf
,
*
pConsumerId
);
}
sz
=
taosArrayGetSize
(
pSub
->
assigned
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
unassignedConsumer
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
unassignedVg
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSubscribeObj
(
void
*
buf
,
SMqSubscribeObj
*
pSub
)
{
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
epoch
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
assigned
=
taosArrayInit
(
sz
,
sizeof
(
int64_t
));
if
(
pSub
->
assigned
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
;
buf
=
taosDecodeFixedI64
(
buf
,
&
consumerId
);
taosArrayPush
(
pSub
->
assigned
,
&
consumerId
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedConsumer
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
assigned
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
unassignedConsumer
,
&
cEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedVg
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
unassignedVg
,
&
cEp
);
}
return
buf
;
}
typedef
struct
SMqCGroup
{
char
name
[
TSDB_CONSUMER_GROUP_LEN
];
...
...
@@ -358,8 +497,8 @@ typedef struct SMqTopicObj {
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SHashObj
*
cgroups
;
// SHashObj<SMqCGroup>
SHashObj
*
consumers
;
// SHashObj<SMqConsumerObj>
//
SHashObj *cgroups; // SHashObj<SMqCGroup>
//
SHashObj *consumers; // SHashObj<SMqConsumerObj>
}
SMqTopicObj
;
// TODO: add cache and change name to id
...
...
@@ -367,18 +506,93 @@ typedef struct SMqConsumerTopic {
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int32_t
epoch
;
//TODO: replace with something with ep
SList
*
vgroups
;
// SList<int32_t>
//SList *vgroups; // SList<int32_t>
//vg assigned to the consumer on the topic
SArray
*
pVgInfo
;
// SArray<int32_t>
}
SMqConsumerTopic
;
static
FORCE_INLINE
SMqConsumerTopic
*
tNewConsumerTopic
(
int64_t
consumerId
,
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
)
{
SMqConsumerTopic
*
pCTopic
=
malloc
(
sizeof
(
SMqConsumerTopic
));
if
(
pCTopic
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
strcpy
(
pCTopic
->
name
,
pTopic
->
name
);
pCTopic
->
epoch
=
0
;
pCTopic
->
pVgInfo
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
int32_t
unassignedVgSz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
if
(
unassignedVgSz
>
0
)
{
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pCTopic
->
pVgInfo
,
&
pCEp
->
vgId
);
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
}
return
pCTopic
;
}
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerTopic
(
void
**
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pConsumerTopic
->
name
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerTopic
->
epoch
);
int32_t
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
*
pVgInfo
=
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
*
pVgInfo
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerTopic
(
void
*
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
buf
=
taosDecodeStringTo
(
buf
,
pConsumerTopic
->
name
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerTopic
->
epoch
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumerTopic
->
pVgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgInfo
;
buf
=
taosDecodeFixedI32
(
buf
,
&
vgInfo
);
taosArrayPush
(
pConsumerTopic
->
pVgInfo
,
&
vgInfo
);
}
return
buf
;
}
typedef
struct
SMqConsumerObj
{
int64_t
consumerId
;
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
SHashObj
*
topicHash
;
//SHashObj<SMqConsumerTopic
>
//SHashObj *topicHash; //SHashObj<SMqTopicObj
>
}
SMqConsumerObj
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
*
pConsumerTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
tlen
+=
tEncodeSMqConsumerTopic
(
buf
,
pConsumerTopic
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerObj
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
cTopic
;
buf
=
tDecodeSMqConsumerTopic
(
buf
,
&
cTopic
);
taosArrayPush
(
pConsumer
->
topics
,
&
cTopic
);
}
return
buf
;
}
typedef
struct
SMqSubConsumerObj
{
int64_t
consumerUid
;
// if -1, unassigned
SList
*
vgId
;
// SList<int32_t>
...
...
source/dnode/mnode/impl/inc/mndSubscribe.h
0 → 100644
浏览文件 @
7ab261d7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SUBSCRIBE_H_
#define _TD_MND_SUBSCRIBE_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
);
void
mndCleanupSubscribe
(
SMnode
*
pMnode
);
SMqSubscribeObj
*
mndAcquireSubscribe
(
SMnode
*
pMnode
,
char
*
CGroup
,
char
*
topicName
);
void
mndReleaseSubscribe
(
SMnode
*
pMnode
,
SMqSubscribeObj
*
pSub
);
SSdbRaw
*
mndSubscribeActionEncode
(
SMqSubscribeObj
*
pSub
);
SSdbRow
*
mndSubscribeActionDecode
(
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_SUBSCRIBE_H_*/
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
7ab261d7
...
...
@@ -30,24 +30,14 @@
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
static
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
static
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndConsumerActionInsert
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionDelete
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionUpdate
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerObj
*
pNewConsumer
);
static
int32_t
mndProcessCreateConsumerMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropConsumerMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropConsumerInRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessConsumerMetaMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetConsumerMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveConsumer
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
);
int32_t
mndInitConsumer
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_CONSUMER
,
.
keyType
=
SDB_KEY_BINARY
,
...
...
@@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.
updateFp
=
(
SdbUpdateFp
)
mndConsumerActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndConsumerActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle
(
pMnode
,
TDMT_VND_SUBSCRIBE_RSP
,
mndProcessSubscribeInternalRsp
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
static
void
*
mndBuildMqVGroupSetReq
(
SMnode
*
pMnode
,
char
*
topicName
,
int32_t
vgId
,
int64_t
consumerId
,
char
*
cgroup
)
{
return
0
;
}
static
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
int32_t
size
=
sizeof
(
SMqConsumerObj
)
+
MND_CONSUMER_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
size
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
tlen
=
tEncodeSMqConsumerObj
(
NULL
,
pConsumer
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
tlen
);
if
(
pRaw
==
NULL
)
goto
CM_ENCODE_OVER
;
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
goto
CM_ENCODE_OVER
;
void
*
abuf
=
buf
;
tEncodeSMqConsumerObj
(
&
abuf
,
pConsumer
);
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
tlen
,
CM_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
CM_ENCODE_OVER
);
#if 0
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
...
...
@@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
#endif
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CM_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
CM_ENCODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
CM_ENCODE_OVER:
if
(
terrno
!=
0
)
{
mError
(
"consumer:%ld, failed to encode to raw:%p since %s"
,
pConsumer
->
consumerId
,
pRaw
,
terrstr
());
...
...
@@ -116,7 +112,7 @@ CM_ENCODE_OVER:
return
pRaw
;
}
static
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
...
...
@@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto
CONSUME_DECODE_OVER
;
}
int32_t
size
=
sizeof
(
SMqConsumerObj
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SMqConsumerObj
));
if
(
pRow
==
NULL
)
goto
CONSUME_DECODE_OVER
;
SMqConsumerObj
*
pConsumer
=
sdbGetRowObj
(
pRow
);
if
(
pConsumer
==
NULL
)
goto
CONSUME_DECODE_OVER
;
int32_t
dataPos
=
0
;
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pConsumer
->
consumerId
,
CONSUME_DECODE_OVER
);
int32_t
len
,
topicNum
;
int32_t
len
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
CONSUME_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pConsumer
->
cgroup
,
len
,
CONSUME_DECODE_OVER
);
void
*
buf
=
malloc
(
len
);
if
(
buf
==
NULL
)
goto
CONSUME_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
CONSUME_DECODE_OVER
);
tDecodeSMqConsumerObj
(
buf
,
pConsumer
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CONSUME_DECODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
#if 0
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
...
...
@@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
#endif
CONSUME_DECODE_OVER:
if
(
terrno
!=
0
)
{
...
...
@@ -162,8 +168,6 @@ CONSUME_DECODE_OVER:
return
NULL
;
}
/*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
return
pRow
;
}
...
...
@@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
sdbRelease
(
pSdb
,
pConsumer
);
}
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMSubscribeReq
subscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
&
subscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
char
*
consumerGroup
=
subscribe
.
consumerGroup
;
int32_t
cgroupLen
=
strlen
(
consumerGroup
);
SArray
*
newSub
=
NULL
;
int
newTopicNum
=
subscribe
.
topicNum
;
if
(
newTopicNum
)
{
newSub
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
}
SMqConsumerTopic
*
pConsumerTopics
=
calloc
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
if
(
pConsumerTopics
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
char
*
newTopicName
=
taosArrayGetP
(
newSub
,
i
);
SMqConsumerTopic
*
pConsumerTopic
=
&
pConsumerTopics
[
i
];
strcpy
(
pConsumerTopic
->
name
,
newTopicName
);
pConsumerTopic
->
vgroups
=
tdListNew
(
sizeof
(
int64_t
));
}
taosArrayAddBatch
(
newSub
,
pConsumerTopics
,
newTopicNum
);
free
(
pConsumerTopics
);
taosArraySortString
(
newSub
,
taosArrayCompareString
);
SArray
*
oldSub
=
NULL
;
int
oldTopicNum
=
0
;
// create consumer if not exist
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
// create consumer
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
}
else
{
oldSub
=
pConsumer
->
topics
;
oldTopicNum
=
taosArrayGetSize
(
oldSub
);
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
//TODO: free memory
return
-
1
;
}
int
i
=
0
,
j
=
0
;
while
(
i
<
newTopicNum
||
j
<
oldTopicNum
)
{
SMqConsumerTopic
*
pOldTopic
=
NULL
;
SMqConsumerTopic
*
pNewTopic
=
NULL
;
if
(
i
>=
newTopicNum
)
{
// encode unset topic msg to all vnodes related to that topic
pOldTopic
=
taosArrayGet
(
oldSub
,
j
);
j
++
;
}
else
if
(
j
>=
oldTopicNum
)
{
pNewTopic
=
taosArrayGet
(
newSub
,
i
);
i
++
;
}
else
{
pNewTopic
=
taosArrayGet
(
newSub
,
i
);
pOldTopic
=
taosArrayGet
(
oldSub
,
j
);
char
*
newName
=
pNewTopic
->
name
;
char
*
oldName
=
pOldTopic
->
name
;
int
comp
=
compareLenPrefixedStr
(
newName
,
oldName
);
if
(
comp
==
0
)
{
// do nothing
pOldTopic
=
pNewTopic
=
NULL
;
i
++
;
j
++
;
continue
;
}
else
if
(
comp
<
0
)
{
pOldTopic
=
NULL
;
i
++
;
}
else
{
pNewTopic
=
NULL
;
j
++
;
}
}
if
(
pOldTopic
!=
NULL
)
{
//cancel subscribe of that old topic
ASSERT
(
pNewTopic
==
NULL
);
char
*
oldTopicName
=
pOldTopic
->
name
;
SList
*
vgroups
=
pOldTopic
->
vgroups
;
SListIter
iter
;
tdListInitIter
(
vgroups
,
&
iter
,
TD_LIST_FORWARD
);
SListNode
*
pn
;
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
oldTopicName
);
ASSERT
(
pTopic
!=
NULL
);
SMqCGroup
*
pGroup
=
taosHashGet
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
int32_t
vgId
=
*
(
int64_t
*
)
pn
->
data
;
// acquire and get epset
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
// TODO what time to release?
if
(
pVgObj
==
NULL
)
{
// TODO handle error
continue
;
}
//build reset msg
void
*
pMqVgSetReq
=
mndBuildMqVGroupSetReq
(
pMnode
,
oldTopicName
,
vgId
,
consumerId
,
consumerGroup
);
// TODO:serialize
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
pMqVgSetReq
;
action
.
contLen
=
0
;
// TODO
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMqVgSetReq
);
mndTransDrop
(
pTrans
);
// TODO free
return
-
1
;
}
}
//delete data in mnode
taosHashRemove
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
if
(
pNewTopic
!=
NULL
)
{
// save subscribe info to mnode
ASSERT
(
pOldTopic
==
NULL
);
char
*
newTopicName
=
pNewTopic
->
name
;
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
newTopicName
);
ASSERT
(
pTopic
!=
NULL
);
SMqCGroup
*
pGroup
=
taosHashGet
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
if
(
pGroup
==
NULL
)
{
// add new group
pGroup
=
malloc
(
sizeof
(
SMqCGroup
));
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pGroup
->
consumerIds
=
tdListNew
(
sizeof
(
int64_t
));
if
(
pGroup
->
consumerIds
==
NULL
)
{
free
(
pGroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pGroup
->
status
=
0
;
// add into cgroups
taosHashPut
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
,
pGroup
,
sizeof
(
SMqCGroup
));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend
(
pGroup
->
consumerIds
,
&
consumerId
);
SSdbRaw
*
pTopicRaw
=
mndTopicActionEncode
(
pTopic
);
sdbSetRawStatus
(
pTopicRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pTopicRaw
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
{
ASSERT
(
0
);
}
}
// destroy old sub
taosArrayDestroy
(
oldSub
);
// put new sub into consumerobj
pConsumer
->
topics
=
newSub
;
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pConsumerRaw
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
// TODO: free memory
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
0
;
}
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
#if 0
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
...
...
@@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return 0;
}
...
...
@@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
#endif
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
7ab261d7
...
...
@@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
}
static
SClientHbRsp
*
mndMqHbBuildRsp
(
SMnode
*
pMnode
,
SClientHbReq
*
pReq
)
{
#if 0
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
...
...
@@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
#endif
return
NULL
;
}
static
int32_t
mndProcessHeartBeatReq
(
SMnodeMsg
*
pReq
)
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
0 → 100644
浏览文件 @
7ab261d7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
static
SSdbRow
*
mndSubActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSubActionInsert
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
static
int32_t
mndSubActionDelete
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
static
int32_t
mndSubActionUpdate
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pOldSub
,
SMqSubscribeObj
*
pNewSub
);
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSubActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSubActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSubActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSubActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle
(
pMnode
,
TDMT_VND_SUBSCRIBE_RSP
,
mndProcessSubscribeInternalRsp
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
static
int
mndInitUnassignedVg
(
SMnode
*
pMnode
,
SMqTopicObj
*
pTopic
,
SArray
*
unassignedVg
)
{
SMqConsumerEp
CEp
;
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
int32_t
sz
;
SVgObj
*
pVgroup
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
NULL
,
(
void
**
)
&
pVgroup
);
while
(
pIter
!=
NULL
)
{
if
(
pVgroup
->
dbUid
==
pTopic
->
dbUid
)
{
CEp
.
epset
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
CEp
.
vgId
=
pVgroup
->
vgId
;
taosArrayPush
(
unassignedVg
,
&
CEp
);
}
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
}
return
0
;
}
static
int
mndBuildMqSetConsumerVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerTopic
*
pConsumerTopic
)
{
int32_t
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
SMqSetCVgReq
req
=
{
.
vgId
=
vgId
,
.
consumerId
=
pConsumer
->
consumerId
,
};
strcpy
(
req
.
cGroup
,
pConsumer
->
cgroup
);
strcpy
(
req
.
topicName
,
pConsumerTopic
->
name
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
void
*
abuf
=
reqStr
;
tEncodeSMqSetCVgReq
(
abuf
,
&
req
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
reqStr
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
reqStr
);
return
-
1
;
}
}
return
0
;
}
void
mndCleanupSubscribe
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
pSub
)
{
int32_t
tlen
=
tEncodeSubscribeObj
(
NULL
,
pSub
);
int32_t
size
=
tlen
+
MND_SUBSCRIBE_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SUBSCRIBE
,
MND_SUBSCRIBE_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
SUB_ENCODE_OVER
;
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
SUB_ENCODE_OVER
;
}
void
*
abuf
=
buf
;
tEncodeSubscribeObj
(
&
buf
,
pSub
);
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
tlen
,
SUB_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
SUB_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
SUB_ENCODE_OVER
);
SUB_ENCODE_OVER:
if
(
terrno
!=
0
)
{
mError
(
"subscribe:%s, failed to encode to raw:%p since %s"
,
pSub
->
key
,
pRaw
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
NULL
;
}
mTrace
(
"subscribe:%s, encode to raw:%p, row:%p"
,
pSub
->
key
,
pRaw
,
pSub
);
return
pRaw
;
}
static
SSdbRow
*
mndSubActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
SUB_DECODE_OVER
;
if
(
sver
!=
MND_SUBSCRIBE_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
SUB_DECODE_OVER
;
}
int32_t
size
=
sizeof
(
SMqSubscribeObj
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
if
(
pRow
==
NULL
)
goto
SUB_DECODE_OVER
;
SMqSubscribeObj
*
pSub
=
sdbGetRowObj
(
pRow
);
if
(
pSub
==
NULL
)
goto
SUB_DECODE_OVER
;
int32_t
dataPos
=
0
;
int32_t
tlen
;
void
*
buf
=
malloc
(
tlen
+
1
);
if
(
buf
==
NULL
)
goto
SUB_DECODE_OVER
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
tlen
,
SUB_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
SUB_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_DECODE_OVER
);
if
(
tDecodeSubscribeObj
(
buf
,
pSub
)
==
NULL
)
{
goto
SUB_DECODE_OVER
;
}
SUB_DECODE_OVER:
if
(
terrno
!=
0
)
{
mError
(
"subscribe:%s, failed to decode from raw:%p since %s"
,
pSub
->
key
,
pRaw
,
terrstr
());
// TODO free subscribeobj
tfree
(
pRow
);
return
NULL
;
}
return
pRow
;
}
static
int32_t
mndSubActionInsert
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pSub
)
{
mTrace
(
"subscribe:%s, perform insert action"
,
pSub
->
key
);
return
0
;
}
static
int32_t
mndSubActionDelete
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pSub
)
{
mTrace
(
"subscribe:%s, perform delete action"
,
pSub
->
key
);
return
0
;
}
static
int32_t
mndSubActionUpdate
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pOldSub
,
SMqSubscribeObj
*
pNewSub
)
{
mTrace
(
"subscribe:%s, perform update action"
,
pOldSub
->
key
);
return
0
;
}
static
void
*
mndBuildMqVGroupSetReq
(
SMnode
*
pMnode
,
char
*
topicName
,
int32_t
vgId
,
int64_t
consumerId
,
char
*
cgroup
)
{
return
0
;
}
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
)
{
char
*
key
=
malloc
(
TSDB_SHOW_SUBQUERY_LEN
);
if
(
key
==
NULL
)
{
return
NULL
;
}
int
tlen
=
strlen
(
cgroup
);
memcpy
(
key
,
cgroup
,
tlen
);
key
[
tlen
]
=
':'
;
strcpy
(
key
+
tlen
+
1
,
topicName
);
return
key
;
}
SMqSubscribeObj
*
mndAcquireSubscribe
(
SMnode
*
pMnode
,
char
*
cgroup
,
char
*
topicName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
char
*
key
=
mndMakeSubscribeKey
(
cgroup
,
topicName
);
SMqSubscribeObj
*
pSub
=
sdbAcquire
(
pSdb
,
SDB_SUBSCRIBE
,
key
);
free
(
key
);
if
(
pSub
==
NULL
)
{
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
}
return
pSub
;
}
void
mndReleaseSubscribe
(
SMnode
*
pMnode
,
SMqSubscribeObj
*
pSub
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pSub
);
}
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMSubscribeReq
subscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
&
subscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
char
*
consumerGroup
=
subscribe
.
consumerGroup
;
int32_t
cgroupLen
=
strlen
(
consumerGroup
);
SArray
*
newSub
=
subscribe
.
topicNames
;
int
newTopicNum
=
subscribe
.
topicNum
;
taosArraySortString
(
newSub
,
taosArrayCompareString
);
SArray
*
oldSub
=
NULL
;
int
oldTopicNum
=
0
;
// create consumer if not exist
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
// create consumer
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
}
else
{
oldSub
=
pConsumer
->
topics
;
}
pConsumer
->
topics
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
if
(
oldSub
!=
NULL
)
{
oldTopicNum
=
taosArrayGetSize
(
oldSub
);
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
// TODO: free memory
return
-
1
;
}
int
i
=
0
,
j
=
0
;
while
(
i
<
newTopicNum
||
j
<
oldTopicNum
)
{
char
*
newTopicName
=
NULL
;
char
*
oldTopicName
=
NULL
;
if
(
i
>=
newTopicNum
)
{
// encode unset topic msg to all vnodes related to that topic
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
j
++
;
}
else
if
(
j
>=
oldTopicNum
)
{
newTopicName
=
taosArrayGet
(
newSub
,
i
);
i
++
;
}
else
{
newTopicName
=
taosArrayGet
(
newSub
,
i
);
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
int
comp
=
compareLenPrefixedStr
(
newTopicName
,
oldTopicName
);
if
(
comp
==
0
)
{
// do nothing
oldTopicName
=
newTopicName
=
NULL
;
i
++
;
j
++
;
continue
;
}
else
if
(
comp
<
0
)
{
oldTopicName
=
NULL
;
i
++
;
}
else
{
newTopicName
=
NULL
;
j
++
;
}
}
if
(
oldTopicName
!=
NULL
)
{
#if 0
// cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
}
}
// delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
#endif
}
else
if
(
newTopicName
!=
NULL
)
{
// save subscribe info to mnode
ASSERT
(
oldTopicName
==
NULL
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
newTopicName
);
if
(
pTopic
==
NULL
)
{
/*terrno = */
continue
;
}
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
consumerGroup
,
newTopicName
);
if
(
pSub
==
NULL
)
{
pSub
=
tNewSubscribeObj
();
if
(
pSub
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
// set unassigned vg
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
);
}
SMqConsumerTopic
*
pConsumerTopic
=
tNewConsumerTopic
(
consumerId
,
pTopic
,
pSub
);
taosArrayPush
(
pConsumer
->
topics
,
pConsumerTopic
);
if
(
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
)
>
0
)
{
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGetLast
(
pConsumerTopic
->
pVgInfo
);
// send setmsg to vnode
if
(
mndBuildMqSetConsumerVgReq
(
pMnode
,
pTrans
,
pConsumer
,
pConsumerTopic
)
<
0
)
{
// TODO
return
-
1
;
}
}
taosArrayDestroy
(
pConsumerTopic
->
pVgInfo
);
free
(
pConsumerTopic
);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
#endif
mndReleaseTopic
(
pMnode
,
pTopic
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
}
// part3. persist consumerObj
// destroy old sub
if
(
oldSub
)
taosArrayDestroy
(
oldSub
);
// put new sub into consumerobj
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pConsumerRaw
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
// TODO: free memory
if
(
newSub
)
taosArrayDestroy
(
newSub
);
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
0
;
}
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessConsumerMetaMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
STableInfoReq
*
pInfo
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"subscribe:%s, start to retrieve meta"
,
pInfo
->
tableFname
);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return
0
;
}
static
int32_t
mndGetNumOfConsumers
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfConsumers
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
return
-
1
;
}
int32_t
numOfConsumers
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SMqConsumerObj
*
pConsumer
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pIter
==
NULL
)
break
;
numOfConsumers
++
;
sdbRelease
(
pSdb
,
pConsumer
);
}
*
pNumOfConsumers
=
numOfConsumers
;
return
0
;
}
static
int32_t
mndGetConsumerMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
if
(
mndGetNumOfConsumers
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
return
-
1
;
}
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"name"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"columns"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"tags"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_CONSUMER
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
7ab261d7
...
...
@@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
logicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
pTopic
->
physicalPlan
=
calloc
(
physicalPlanLen
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
physicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
...
...
@@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
TOPIC_ENCODE_OVER:
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"topic:%s, failed to encode to raw:%p since %s"
,
pTopic
->
name
,
pRaw
,
terrstr
());
/*if (pTopic->logicalPlan) {*/
/*free(pTopic->logicalPlan);*/
/*}*/
/*if (pTopic->physicalPlan) {*/
/*free(pTopic->physicalPlan);*/
/*}*/
sdbFreeRaw
(
pRaw
);
return
NULL
;
}
...
...
@@ -138,7 +130,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
logicalPlan
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
...
...
@@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
log
icalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
pTopic
->
phys
icalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
{
free
(
pTopic
->
logicalPlan
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
;
terrno
=
TSDB_CODE_SUCCESS
;
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
7ab261d7
...
...
@@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) {
return
"auth"
;
case
SDB_ACCT
:
return
"acct"
;
case
SDB_SUBSCRIBE
:
return
"subscribe"
;
case
SDB_CONSUMER
:
return
"consumer"
;
case
SDB_CGROUP
:
return
"cgroup"
;
case
SDB_TOPIC
:
return
"topic"
;
case
SDB_VGROUP
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录