Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2dcb0145
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2dcb0145
编写于
4月 21, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(tmq): add db subscribe
上级
3dd3ad1e
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
154 addition
and
1116 deletion
+154
-1116
include/common/tmsg.h
include/common/tmsg.h
+36
-7
source/common/src/tmsg.c
source/common/src/tmsg.c
+9
-3
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+28
-313
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+33
-64
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+3
-3
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-563
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+44
-42
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+0
-121
未找到文件。
include/common/tmsg.h
浏览文件 @
2dcb0145
...
...
@@ -1273,11 +1273,16 @@ typedef struct {
}
SMVCreateStreamRsp
,
SMSCreateStreamRsp
;
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
igExists
;
char
*
sql
;
char
*
ast
;
char
subscribeDbName
[
TSDB_DB_NAME_LEN
];
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
igExists
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
sql
;
char
*
ast
;
int64_t
subDbUid
;
char
subscribeDbName
[
TSDB_DB_NAME_LEN
];
}
SCMCreateTopicReq
;
int32_t
tSerializeSCMCreateTopicReq
(
void
*
buf
,
int32_t
bufLen
,
const
SCMCreateTopicReq
*
pReq
);
...
...
@@ -1932,12 +1937,22 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return
buf
;
}
enum
{
TOPIC_SUB_TYPE__DB
=
1
,
TOPIC_SUB_TYPE__TABLE
,
};
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int8_t
subType
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
}
SMqRebVgReq
;
...
...
@@ -1948,7 +1963,14 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
subKey
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
subType
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTagSchema
);
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
}
return
tlen
;
}
...
...
@@ -1958,7 +1980,14 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
subKey
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
subType
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTagSchema
);
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
}
return
(
void
*
)
buf
;
}
...
...
source/common/src/tmsg.c
浏览文件 @
2dcb0145
...
...
@@ -2674,6 +2674,10 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withSchema
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withTag
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withTagSchema
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
astLen
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
...
...
@@ -2696,6 +2700,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withSchema
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withTag
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withTagSchema
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
astLen
)
<
0
)
return
-
1
;
...
...
@@ -3032,7 +3040,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
return
0
;
}
int32_t
tSerializeSAlterVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SAlterVnodeReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
@@ -3052,7 +3059,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
SReplica
*
pReplica
=
&
pReq
->
replicas
[
i
];
if
(
tEncodeSReplica
(
&
encoder
,
pReplica
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -3085,7 +3092,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
return
0
;
}
int32_t
tSerializeSKillQueryReq
(
void
*
buf
,
int32_t
bufLen
,
SKillQueryReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2dcb0145
...
...
@@ -418,82 +418,6 @@ typedef struct {
char
payload
[];
}
SSysTableRetrieveObj
;
typedef
struct
{
int32_t
vgId
;
// -1 for unassigned
int32_t
status
;
int32_t
epoch
;
SEpSet
epSet
;
int64_t
oldConsumerId
;
int64_t
consumerId
;
// -1 for unassigned
char
*
qmsg
;
}
SMqConsumerEp
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
const
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
status
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
epoch
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumerEp
->
qmsg
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
status
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
epoch
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeString
(
buf
,
&
pConsumerEp
->
qmsg
);
return
buf
;
}
static
FORCE_INLINE
void
tDeleteSMqConsumerEp
(
SMqConsumerEp
*
pConsumerEp
)
{
if
(
pConsumerEp
)
{
taosMemoryFreeClear
(
pConsumerEp
->
qmsg
);
}
}
typedef
struct
{
int64_t
consumerId
;
SArray
*
vgInfo
;
// SArray<SMqConsumerEp>
}
SMqSubConsumer
;
static
FORCE_INLINE
int32_t
tEncodeSMqSubConsumer
(
void
**
buf
,
const
SMqSubConsumer
*
pConsumer
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
vgInfo
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pConsumer
->
vgInfo
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqSubConsumer
(
void
**
buf
,
SMqSubConsumer
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
vgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
consumerEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
consumerEp
);
taosArrayPush
(
pConsumer
->
vgInfo
,
&
consumerEp
);
}
return
buf
;
}
static
FORCE_INLINE
void
tDeleteSMqSubConsumer
(
SMqSubConsumer
*
pSubConsumer
)
{
if
(
pSubConsumer
->
vgInfo
)
{
taosArrayDestroyEx
(
pSubConsumer
->
vgInfo
,
(
void
(
*
)(
void
*
))
tDeleteSMqConsumerEp
);
pSubConsumer
->
vgInfo
=
NULL
;
}
}
typedef
struct
{
char
key
[
TSDB_PARTITION_KEY_LEN
];
int64_t
offset
;
...
...
@@ -512,147 +436,21 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset)
return
buf
;
}
#if 0
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status;
int32_t vgNum;
SArray* consumers; // SArray<SMqSubConsumer>
SArray* lostConsumers; // SArray<SMqSubConsumer>
SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj;
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
SMqSubscribeObj* pSub = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSub == NULL) {
return NULL;
}
pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer));
if (pSub->consumers == NULL) {
goto _err;
}
pSub->lostConsumers = taosArrayInit(0, sizeof(SMqSubConsumer));
if (pSub->lostConsumers == NULL) {
goto _err;
}
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) {
goto _err;
}
pSub->key[0] = 0;
pSub->vgNum = 0;
pSub->status = 0;
return pSub;
_err:
taosMemoryFreeClear(pSub->consumers);
taosMemoryFreeClear(pSub->lostConsumers);
taosMemoryFreeClear(pSub->unassignedVg);
taosMemoryFreeClear(pSub);
return NULL;
}
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI32(buf, pSub->status);
int32_t sz;
sz = taosArrayGetSize(pSub->consumers);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i);
tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
}
sz = taosArrayGetSize(pSub->lostConsumers);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->lostConsumers, i);
tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
}
sz = taosArrayGetSize(pSub->unassignedVg);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI32(buf, &pSub->status);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
if (pSub->consumers == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer subConsumer = {0};
buf = tDecodeSMqSubConsumer(buf, &subConsumer);
taosArrayPush(pSub->consumers, &subConsumer);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->lostConsumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
if (pSub->lostConsumers == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer subConsumer = {0};
buf = tDecodeSMqSubConsumer(buf, &subConsumer);
taosArrayPush(pSub->lostConsumers, &subConsumer);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp consumerEp = {0};
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
taosArrayPush(pSub->unassignedVg, &consumerEp);
}
return buf;
}
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->consumers) {
// taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
// taosArrayDestroy(pSub->consumers);
pSub->consumers = NULL;
}
if (pSub->unassignedVg) {
// taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
// taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL;
}
}
#endif
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
// TODO: use subDbUid
int64_t
dbUid
;
int64_t
subDbUid
;
int32_t
version
;
int8_t
subType
;
// db or table
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SRWLatch
lock
;
int32_t
sqlLen
;
int32_t
astLen
;
...
...
@@ -662,79 +460,6 @@ typedef struct {
SSchemaWrapper
schema
;
}
SMqTopicObj
;
#if 0
typedef struct {
int64_t consumerId;
int64_t connId;
SRWLatch lock;
char cgroup[TSDB_CGROUP_LEN];
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int32_t epoch;
// stat
int64_t pollCnt;
// status
int32_t status;
// heartbeat from the consumer reset hbStatus to 0
// each checkConsumerAlive msg add hbStatus by 1
// if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost
int32_t hbStatus;
} SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
int32_t sz;
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
tlen += taosEncodeFixedI32(buf, pConsumer->status);
tlen += taosEncodeString(buf, pConsumer->cgroup);
sz = taosArrayGetSize(pConsumer->currentTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->currentTopics, i);
tlen += taosEncodeString(buf, topic);
}
sz = taosArrayGetSize(pConsumer->recentRemovedTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i);
tlen += taosEncodeString(buf, topic);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumer->connId);
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeFixedI32(buf, &pConsumer->status);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->currentTopics, &topic);
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->recentRemovedTopics, &topic);
}
return buf;
}
#endif
enum
{
CONSUMER_UPDATE__TOUCH
=
1
,
CONSUMER_UPDATE__ADD
,
...
...
@@ -753,12 +478,9 @@ typedef struct {
int32_t
hbStatus
;
// lock is used for topics update
SRWLatch
lock
;
SArray
*
currentTopics
;
// SArray<char*>
#if 0
SArray* waitingRebTopics; // SArray<char*>
#endif
SArray
*
rebNewTopics
;
// SArray<char*>
SArray
*
rebRemovedTopics
;
// SArray<char*>
SArray
*
currentTopics
;
// SArray<char*>
SArray
*
rebNewTopics
;
// SArray<char*>
SArray
*
rebRemovedTopics
;
// SArray<char*>
}
SMqConsumerObj
;
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
]);
...
...
@@ -768,9 +490,13 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef
struct
{
int32_t
vgId
;
int8_t
subType
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
// char topic[TSDB_TOPIC_FNAME_LEN];
SEpSet
epSet
;
SEpSet
epSet
;
}
SMqVgEp
;
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
);
...
...
@@ -792,7 +518,14 @@ typedef struct {
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRWLatch
lock
;
int32_t
vgNum
;
int8_t
subType
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SHashObj
*
consumerHash
;
// consumerId -> SMqConsumerEpInSub
// TODO put -1 into unassignVgs
// SArray* unassignedVgs;
}
SMqSubscribeObj
;
SMqSubscribeObj
*
tNewSubscribeObj
(
const
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
]);
...
...
@@ -821,18 +554,6 @@ void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
int32_t
tEncodeSMqSubActionLogObj
(
void
**
buf
,
const
SMqSubActionLogObj
*
pLog
);
void
*
tDecodeSMqSubActionLogObj
(
const
void
*
buf
,
SMqSubActionLogObj
*
pLog
);
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SRWLatch
lock
;
SArray
*
vgs
;
// SArray<SMqVgEp*>
}
SMqConsumerEpObj
;
SMqConsumerEpObj
*
tCloneSMqConsumerEpObj
(
const
SMqConsumerEpObj
*
pConsumerEp
);
void
tDeleteSMqConsumerEpObj
(
SMqConsumerEpObj
*
pConsumerEp
);
int32_t
tEncodeSMqConsumerEpObj
(
void
**
buf
,
const
SMqConsumerEpObj
*
pConsumerEp
);
void
*
tDecodeSMqConsumerEpObj
(
const
void
*
buf
,
SMqConsumerEpObj
*
pConsumerEp
);
typedef
struct
{
const
SMqSubscribeObj
*
pOldSub
;
const
SMqTopicObj
*
pTopic
;
...
...
@@ -845,12 +566,6 @@ typedef struct {
SMqVgEp
*
pVgEp
;
}
SMqRebOutputVg
;
#if 0
typedef struct {
int64_t consumerId;
} SMqRebOutputConsumer;
#endif
typedef
struct
{
SArray
*
rebVgs
;
// SArray<SMqRebOutputVg>
SArray
*
newConsumers
;
// SArray<int64_t>
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
2dcb0145
...
...
@@ -32,9 +32,6 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
taosInitRWLatch
(
&
pConsumer
->
lock
);
pConsumer
->
currentTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
#if 0
pConsumer->waitingRebTopics = NULL;
#endif
pConsumer
->
rebNewTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pConsumer
->
rebRemovedTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -53,11 +50,6 @@ void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
if
(
pConsumer
->
currentTopics
)
{
taosArrayDestroyP
(
pConsumer
->
currentTopics
,
(
FDelete
)
taosMemoryFree
);
}
#if 0
if (pConsumer->waitingRebTopics) {
taosArrayDestroyP(pConsumer->waitingRebTopics, taosMemoryFree);
}
#endif
if
(
pConsumer
->
rebNewTopics
)
{
taosArrayDestroyP
(
pConsumer
->
rebNewTopics
,
(
FDelete
)
taosMemoryFree
);
}
...
...
@@ -87,20 +79,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen
+=
taosEncodeFixedI32
(
buf
,
0
);
}
#if 0
// waiting reb topics
if (pConsumer->waitingRebTopics) {
sz = taosArrayGetSize(pConsumer->waitingRebTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->waitingRebTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#endif
// reb new topics
if
(
pConsumer
->
rebNewTopics
)
{
sz
=
taosArrayGetSize
(
pConsumer
->
rebNewTopics
);
...
...
@@ -145,17 +123,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
taosArrayPush
(
pConsumer
->
currentTopics
,
&
topic
);
}
#if 0
// waiting reb topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->waitingRebTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->waitingRebTopics, &topic);
}
#endif
// reb new topics
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
rebNewTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
...
...
@@ -181,6 +148,11 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp
*
pVgEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
if
(
pVgEpNew
==
NULL
)
return
NULL
;
pVgEpNew
->
vgId
=
pVgEp
->
vgId
;
pVgEpNew
->
subType
=
pVgEp
->
subType
;
pVgEpNew
->
withTbName
=
pVgEp
->
withTbName
;
pVgEpNew
->
withSchema
=
pVgEp
->
withSchema
;
pVgEpNew
->
withTag
=
pVgEp
->
withTag
;
pVgEpNew
->
withTagSchema
=
pVgEp
->
withTagSchema
;
pVgEpNew
->
qmsg
=
strdup
(
pVgEp
->
qmsg
);
/*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/
pVgEpNew
->
epSet
=
pVgEp
->
epSet
;
...
...
@@ -192,6 +164,11 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
int32_t
tEncodeSMqVgEp
(
void
**
buf
,
const
SMqVgEp
*
pVgEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pVgEp
->
vgId
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pVgEp
->
subType
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pVgEp
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pVgEp
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pVgEp
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pVgEp
->
withTagSchema
);
tlen
+=
taosEncodeString
(
buf
,
pVgEp
->
qmsg
);
/*tlen += taosEncodeString(buf, pVgEp->topic);*/
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
...
...
@@ -200,41 +177,17 @@ int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
void
*
tDecodeSMqVgEp
(
const
void
*
buf
,
SMqVgEp
*
pVgEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pVgEp
->
vgId
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pVgEp
->
subType
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pVgEp
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pVgEp
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pVgEp
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pVgEp
->
withTagSchema
);
buf
=
taosDecodeString
(
buf
,
&
pVgEp
->
qmsg
);
/*buf = taosDecodeStringTo(buf, pVgEp->topic);*/
buf
=
taosDecodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
(
void
*
)
buf
;
}
SMqConsumerEpObj
*
tCloneSMqConsumerEpObj
(
const
SMqConsumerEpObj
*
pConsumerEp
)
{
SMqConsumerEpObj
*
pConsumerEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEpObj
));
if
(
pConsumerEpNew
==
NULL
)
return
NULL
;
pConsumerEpNew
->
consumerId
=
pConsumerEp
->
consumerId
;
memcpy
(
pConsumerEpNew
->
cgroup
,
pConsumerEp
->
cgroup
,
TSDB_CGROUP_LEN
);
taosInitRWLatch
(
&
pConsumerEpNew
->
lock
);
pConsumerEpNew
->
vgs
=
taosArrayDeepCopy
(
pConsumerEpNew
->
vgs
,
(
FCopy
)
tCloneSMqVgEp
);
return
pConsumerEpNew
;
}
void
tDeleteSMqConsumerEpObj
(
SMqConsumerEpObj
*
pConsumerEp
)
{
taosArrayDestroyEx
(
pConsumerEp
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
}
int32_t
tEncodeSMqConsumerEpObj
(
void
**
buf
,
const
SMqConsumerEpObj
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumerEp
->
cgroup
);
tlen
+=
taosEncodeArray
(
buf
,
pConsumerEp
->
vgs
,
(
FEncode
)
tEncodeSMqVgEp
);
return
tlen
;
}
void
*
tDecodeSMqConsumerEpObj
(
const
void
*
buf
,
SMqConsumerEpObj
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumerEp
->
cgroup
);
buf
=
taosDecodeArray
(
buf
,
&
pConsumerEp
->
vgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMqSubVgEp
));
return
(
void
*
)
buf
;
}
SMqConsumerEpInSub
*
tCloneSMqConsumerEpInSub
(
const
SMqConsumerEpInSub
*
pEpInSub
)
{
SMqConsumerEpInSub
*
pEpInSubNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEpInSub
));
if
(
pEpInSubNew
==
NULL
)
return
NULL
;
...
...
@@ -276,7 +229,7 @@ void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
}
SMqSubscribeObj
*
tNewSubscribeObj
(
const
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
])
{
SMqSubscribeObj
*
pSubNew
=
taosMemory
Malloc
(
sizeof
(
SMqSubscribeObj
));
SMqSubscribeObj
*
pSubNew
=
taosMemory
Calloc
(
1
,
sizeof
(
SMqSubscribeObj
));
if
(
pSubNew
==
NULL
)
return
NULL
;
memcpy
(
pSubNew
->
key
,
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
taosInitRWLatch
(
&
pSubNew
->
lock
);
...
...
@@ -297,8 +250,14 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
if
(
pSubNew
==
NULL
)
return
NULL
;
memcpy
(
pSubNew
->
key
,
pSub
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
taosInitRWLatch
(
&
pSubNew
->
lock
);
pSubNew
->
subType
=
pSub
->
subType
;
pSubNew
->
withTbName
=
pSub
->
withTbName
;
pSubNew
->
withSchema
=
pSub
->
withSchema
;
pSubNew
->
withTag
=
pSub
->
withTag
;
pSubNew
->
withTagSchema
=
pSub
->
withTagSchema
;
pSubNew
->
vgNum
=
pSub
->
vgNum
;
/*pSubNew->consumerEps = taosArrayDeepCopy(pSub->consumerEps, (FCopy)tCloneSMqConsumerEpInSub);*/
pSubNew
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
void
*
pIter
=
NULL
;
...
...
@@ -325,6 +284,11 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pSub
->
key
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
vgNum
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
subType
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTagSchema
);
void
*
pIter
=
NULL
;
int32_t
sz
=
taosHashGetSize
(
pSub
->
consumerHash
);
...
...
@@ -347,6 +311,11 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
//
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
vgNum
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
subType
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTagSchema
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
2dcb0145
...
...
@@ -133,9 +133,9 @@ OFFSET_DECODE_OVER:
int32_t
mndCreateOffsets
(
STrans
*
pTrans
,
const
char
*
cgroup
,
const
char
*
topicName
,
const
SArray
*
vgs
)
{
int32_t
sz
=
taosArrayGetSize
(
vgs
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayGet
(
vgs
,
i
);
SMqOffsetObj
offsetObj
;
if
(
mndMakePartitionKey
(
offsetObj
.
key
,
cgroup
,
topicName
,
pConsumerEp
->
vgId
)
<
0
)
{
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGet
(
vgs
,
i
);
SMqOffsetObj
offsetObj
;
if
(
mndMakePartitionKey
(
offsetObj
.
key
,
cgroup
,
topicName
,
vgId
)
<
0
)
{
return
-
1
;
}
offsetObj
.
offset
=
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
2dcb0145
...
...
@@ -46,16 +46,8 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static
int32_t
mndSubActionDelete
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
static
int32_t
mndSubActionUpdate
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pOldSub
,
SMqSubscribeObj
*
pNewSub
);
/*static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);*/
/*static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);*/
static
int32_t
mndProcessSubscribeInternalReq
(
SNodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SNodeMsg
*
pMsg
);
/*static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);*/
/*static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);*/
/*static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);*/
/*static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);*/
static
int32_t
mndProcessRebalanceReq
(
SNodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SNodeMsg
*
pMsg
);
static
int32_t
mndSetSubRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqSubscribeObj
*
pSub
)
{
SSdbRaw
*
pRedoRaw
=
mndSubActionEncode
(
pSub
);
...
...
@@ -73,15 +65,6 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO
return
0
;
}
/*static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,*/
/*const SMqConsumerEp *pConsumerEp);*/
/*static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/
/*const char *topicName);*/
/*static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/
/*const char *oldTopicName);*/
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
.
keyType
=
SDB_KEY_BINARY
,
...
...
@@ -91,13 +74,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.
updateFp
=
(
SdbUpdateFp
)
mndSubActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
};
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);*/
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessRebalanceReq
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
...
...
@@ -122,137 +98,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return
pSub
;
}
#if 0
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
SMqSubscribeObj *pSub = tNewSubscribeObj();
if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, cgroup, pTopic->name);
strcpy(pSub->key, key);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
tDeleteSMqSubscribeObj(pSub);
taosMemoryFree(pSub);
return NULL;
}
// TODO: disable alter subscribed table
return pSub;
}
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
const char *topicName) {
SMqMVRebReq req = {
.vgId = pConsumerEp->vgId,
.oldConsumerId = pConsumerEp->oldConsumerId,
.newConsumerId = pConsumerEp->consumerId,
};
req.topic = strdup(topicName);
int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqMVRebReq(&abuf, &req);
taosMemoryFree(req.topic);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
const char *topicName) {
ASSERT(pConsumerEp->oldConsumerId != -1);
void *buf;
int32_t tlen;
if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) {
return -1;
}
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_REB;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
const char *oldTopicName) {
SMqCancelConnReq req = {0};
req.consumerId = pConsumerEp->consumerId;
req.vgId = pConsumerEp->vgId;
req.epoch = pConsumerEp->epoch;
strcpy(req.topicName, oldTopicName);
int32_t tlen = tEncodeSMqCancelConnReq(NULL, &req);
void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqCancelConnReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
const char *oldTopicName) {
void *buf;
int32_t tlen;
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) {
return -1;
}
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_CANCEL_CONN;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
#endif
static
int32_t
mndBuildSubChangeReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
char
*
subKey
,
const
SMqRebOutputVg
*
pRebVg
)
{
SMqRebVgReq
req
=
{
0
};
req
.
oldConsumerId
=
pRebVg
->
oldConsumerId
;
...
...
@@ -307,108 +152,6 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const ch
return
0
;
}
#if 0
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
if (pConsumer == NULL) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
// TODO add lock
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
int32_t serverEpoch = pConsumer->epoch;
// TODO
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch,
hbStatus);
atomic_store_32(&pConsumer->hbStatus, 0);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy(rsp.cgroup, pReq->cgroup);
if (epoch != serverEpoch) {
mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch,
serverEpoch);
mDebug("consumer %ld try r lock", consumerId);
taosRLockLatch(&pConsumer->lock);
mDebug("consumer %ld r locked", consumerId);
SArray *pTopics = pConsumer->currentTopics;
int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int32_t i = 0; i < sz; i++) {
char *topicName = taosArrayGetP(pTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
ASSERT(pSub);
int32_t csz = taosArrayGetSize(pSub->consumers);
// TODO: change to bsearch
for (int32_t j = 0; j < csz; j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
mInfo("topic %s has %d vg", topicName, serverEpoch);
SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topicName);
ASSERT(pTopic != NULL);
topicEp.schema = pTopic->schema;
mndReleaseTopic(pMnode, pTopic);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
for (int32_t k = 0; k < vgsz; k++) {
char offsetKey[TSDB_PARTITION_KEY_LEN];
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
SMqSubVgEp vgEp = {
.epSet = pConsumerEp->epSet,
.vgId = pConsumerEp->vgId,
.offset = -1,
};
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
if (pOffsetObj != NULL) {
vgEp.offset = pOffsetObj->offset;
mndReleaseOffset(pMnode, pOffsetObj);
}
taosArrayPush(topicEp.vgs, &vgEp);
}
taosArrayPush(rsp.topics, &topicEp);
break;
}
}
mndReleaseSubscribe(pMnode, pSub);
}
taosRUnLockLatch(&pConsumer->lock);
mDebug("consumer %ld r unlock", consumerId);
}
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pRsp = buf;
pMsg->rspLen = tlen;
return 0;
}
#endif
static
int32_t
mndSplitSubscribeKey
(
const
char
*
key
,
char
*
topic
,
char
*
cgroup
)
{
int32_t
i
=
0
;
while
(
key
[
i
]
!=
TMQ_SEPARATOR
)
{
...
...
@@ -433,235 +176,6 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return
pRebSub
;
}
#if 0
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer;
void *pIter = NULL;
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
int32_t old =
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
if (old == MQ_CONSUMER_STATUS__ACTIVE) {
// get all topics of that topic
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
}
}
}
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
SArray *rebSubs;
if (status == MQ_CONSUMER_STATUS__INIT) {
rebSubs = pConsumer->currentTopics;
} else {
rebSubs = pConsumer->recentRemovedTopics;
}
int32_t sz = taosArrayGetSize(rebSubs);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(rebSubs, i);
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
if (status == MQ_CONSUMER_STATUS__INIT) {
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
}
if (status == MQ_CONSUMER_STATUS__MODIFY) {
int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
for (int32_t i = 0; i < removeSz; i++) {
char *topicName = taosArrayGetP(pConsumer->recentRemovedTopics, i);
taosMemoryFree(topicName);
}
taosArrayClear(pConsumer->recentRemovedTopics);
}
}
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_MQ_DO_REBALANCE,
.pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
}
return 0;
}
#endif
#if 0
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
void *pIter = NULL;
mInfo("mq rebalance start");
while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) break;
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum,
(int32_t)taosArrayGetSize(pSub->unassignedVg));
// remove lost consumer
for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId);
for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (pSubConsumer->consumerId == lostConsumerId) {
taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
taosArrayPush(pSub->lostConsumers, pSubConsumer);
taosArrayRemove(pSub->consumers, j);
break;
}
}
}
// calculate rebalance
int32_t consumerNum = taosArrayGetSize(pSub->consumers);
if (consumerNum != 0) {
int32_t vgNum = pSub->vgNum;
int32_t vgEachConsumer = vgNum / consumerNum;
int32_t imbalanceVg = vgNum % consumerNum;
// iterate all consumers, set unassignedVgStash
for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1;
else
vgThisConsumerAfterRb = vgEachConsumer;
mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
mDebug("mq rebalance: vg %d push to unassignedVg", pConsumerEp->vgId);
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
taosWLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w locked", pRebConsumer->consumerId);
int32_t status = atomic_load_32(&pRebConsumer->status);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
/*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
/*pRebConsumer->epoch++;*/
/*}*/
if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
}
mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status,
pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendCommitlog(pTrans, pConsumerRaw);
}
taosWUnLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w unlock", pRebConsumer->consumerId);
mndReleaseConsumer(pMnode, pRebConsumer);
}
// assign to vgroup
if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1;
else
vgThisConsumerAfterRb = vgEachConsumer;
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
mDebug("mq rebalance: vg %d pop from unassignedVg", pConsumerEp->vgId);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
// TODO
pConsumerEp->epoch = 0;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup);
if (pConsumerEp->oldConsumerId == -1) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
topic, pConsumerEp->consumerId, cgroup);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic);
} else {
mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic);
}
}
}
}
ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
// TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pSubRaw);
}
mndReleaseSubscribe(pMnode, pSub);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
taosHashCleanup(pReq->rebSubHash);
mndTransDrop(pTrans);
return -1;
}
taosHashCleanup(pReq->rebSubHash);
mndTransDrop(pTrans);
return 0;
}
#endif
static
int32_t
mndDoRebalance
(
SMnode
*
pMnode
,
const
SMqRebInputObj
*
pInput
,
SMqRebOutputObj
*
pOutput
)
{
if
(
pInput
->
pTopic
!=
NULL
)
{
// create subscribe
...
...
@@ -825,36 +339,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pRebVg
->
newConsumerId
=
pEpInSub
->
consumerId
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebVg
);
}
#if 0
/*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/
if (imbCnt < imbConsumerNum) {
imbCnt++;
// push until equal minVg + 1
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt + 1) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
} else {
// push until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
}
#endif
}
// 7. handle unassigned vg
...
...
@@ -1040,52 +524,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
return
0
;
}
static
int32_t
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
)
{
ASSERT
(
pConsumerEp
->
oldConsumerId
==
-
1
);
int32_t
vgId
=
pConsumerEp
->
vgId
;
SMqSetCVgReq
req
=
{
.
vgId
=
vgId
,
.
consumerId
=
pConsumerEp
->
consumerId
,
.
sql
=
pTopic
->
sql
,
.
physicalPlan
=
pTopic
->
physicalPlan
,
.
qmsg
=
pConsumerEp
->
qmsg
,
};
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
pTopic
->
name
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
taosMemoryMalloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
return
-
1
;
}
return
0
;
}
void
mndCleanupSubscribe
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
pSub
)
{
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
2dcb0145
...
...
@@ -76,7 +76,13 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
updateTime
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
uid
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
dbUid
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
subDbUid
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
version
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
subType
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTbName
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withSchema
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTag
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTagSchema
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
...
...
@@ -134,7 +140,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
updateTime
,
TOPIC_DECODE_OVER
);
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
uid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
dbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
subDbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
version
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
subType
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTbName
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withSchema
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTag
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTagSchema
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
pTopic
->
sql
=
taosMemoryCalloc
(
pTopic
->
sqlLen
,
sizeof
(
char
));
...
...
@@ -254,33 +266,13 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
return
0
;
}
#if 0
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst);
SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pPlan, false, pStr, NULL);
if
((
pCreate
->
ast
==
NULL
||
pCreate
->
ast
[
0
]
==
0
)
&&
pCreate
->
subscribeDbName
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
nodesDestroyNode(pAst);
nodesDestroyNode(pPlan);
terrno = code;
return code;
return
0
;
}
#endif
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
)
{
mDebug
(
"topic:%s to create"
,
pCreate
->
name
);
...
...
@@ -297,28 +289,38 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj
.
ast
=
strdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
if
(
pCreate
->
ast
&&
pCreate
->
ast
[
0
])
{
topicObj
.
subType
=
TOPIC_SUB_TYPE__TABLE
;
topicObj
.
withTbName
=
0
;
topicObj
.
withSchema
=
0
;
SQueryPlan
*
pPlan
=
NULL
;
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SPlanContext
cxt
=
{.
pAstRoot
=
pAst
,
.
topicQuery
=
true
};
if
(
qCreateQueryPlan
(
&
cxt
,
&
pPlan
,
NULL
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SQueryPlan
*
pPlan
=
NULL
;
if
(
qExtractResultSchema
(
pAst
,
&
topicObj
.
schema
.
nCols
,
&
topicObj
.
schema
.
pSchema
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SPlanContext
cxt
=
{.
pAstRoot
=
pAst
,
.
topicQuery
=
true
};
if
(
qCreateQueryPlan
(
&
cxt
,
&
pPlan
,
NULL
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
if
(
nodesNodeToString
(
pPlan
,
false
,
&
topicObj
.
physicalPlan
,
NULL
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
if
(
qExtractResultSchema
(
pAst
,
&
topicObj
.
schema
.
nCols
,
&
topicObj
.
schema
.
pSchema
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
if
(
nodesNodeToString
(
pPlan
,
false
,
&
topicObj
.
physicalPlan
,
NULL
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
else
{
topicObj
.
subType
=
TOPIC_SUB_TYPE__DB
;
topicObj
.
withTbName
=
1
;
topicObj
.
withSchema
=
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_TOPIC
,
&
pReq
->
rpcMsg
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
2dcb0145
...
...
@@ -618,127 +618,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}
}
#if 0
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
terrno = TSDB_CODE_SUCCESS;
tDecodeSMqMVRebReq(msg, &req);
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer);
ASSERT(pConsumer->consumerId == req.oldConsumerId);
int32_t numOfTopics = taosArrayGetSize(pConsumer->topics);
if (numOfTopics == 1) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
ASSERT(strcmp(pTopic->topicName, req.topic) == 0);
STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
if (pNewConsumer == NULL) {
pConsumer->consumerId = req.newConsumerId;
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
return 0;
} else {
taosArrayPush(pNewConsumer->topics, pTopic);
}
} else {
for (int32_t i = 0; i < numOfTopics; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
if (strcmp(pTopic->topicName, req.topic) == 0) {
STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
if (pNewConsumer == NULL) {
pNewConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
if (pNewConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
strcpy(pNewConsumer->cgroup, pConsumer->cgroup);
pNewConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pNewConsumer->consumerId = req.newConsumerId;
pNewConsumer->epoch = 0;
taosArrayPush(pNewConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
return 0;
}
ASSERT(pNewConsumer->consumerId == req.newConsumerId);
taosArrayPush(pNewConsumer->topics, pTopic);
break;
}
}
//
}
return 0;
}
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req = {0};
tDecodeSMqSetCVgReq(msg, &req);
bool create = false;
vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId);
if (pConsumer == NULL) {
pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
if (pConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pConsumer->consumerId = req.consumerId;
pConsumer->epoch = 0;
create = true;
}
STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic));
if (pTopic == NULL) {
taosArrayDestroy(pConsumer->topics);
taosMemoryFree(pConsumer);
return -1;
}
strcpy(pTopic->topicName, req.topicName);
pTopic->sql = req.sql;
pTopic->physicalPlan = req.physicalPlan;
pTopic->qmsg = req.qmsg;
/*pTopic->committedOffset = -1;*/
/*pTopic->currentOffset = -1;*/
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) {
ASSERT(false);
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
};
pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task);
}
vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode));
taosArrayPush(pConsumer->topics, pTopic);
if (create) {
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId);
}
terrno = TSDB_CODE_SUCCESS;
return 0;
}
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
#endif
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录