Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4d26b416
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4d26b416
编写于
4月 07, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 07, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11294 from taosdata/feature/tq
add unsubscribe
上级
9ef7db39
74eee69a
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
79 addition
and
25 deletion
+79
-25
include/common/tmsg.h
include/common/tmsg.h
+47
-15
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/dnode/mgmt/mm/mmHandle.c
source/dnode/mgmt/mm/mmHandle.c
+1
-0
source/dnode/mgmt/vm/vmHandle.c
source/dnode/mgmt/vm/vmHandle.c
+1
-0
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+15
-10
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+5
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+4
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+4
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
4d26b416
...
...
@@ -273,11 +273,11 @@ typedef struct {
char
name
[
TSDB_COL_NAME_LEN
];
}
SSchemaEx
;
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_SMA(s) ((s)->sma)
#define SSCHMEA_TYPE(s)
((s)->type)
#define SSCHMEA_SMA(s)
((s)->sma)
#define SSCHMEA_COLID(s) ((s)->colId)
#define SSCHMEA_BYTES(s) ((s)->bytes)
#define SSCHMEA_NAME(s) ((s)->name)
#define SSCHMEA_NAME(s)
((s)->name)
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -483,7 +483,8 @@ typedef struct {
int32_t
tz
;
// query client timezone
char
intervalUnit
;
char
slidingUnit
;
char
offsetUnit
;
// TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
char
offsetUnit
;
// TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t
precision
;
int64_t
interval
;
int64_t
sliding
;
...
...
@@ -934,12 +935,12 @@ typedef struct SExplainExecInfo {
uint64_t
startupCost
;
uint64_t
totalCost
;
uint64_t
numOfRows
;
void
*
verboseInfo
;
void
*
verboseInfo
;
}
SExplainExecInfo
;
typedef
struct
{
int32_t
numOfPlans
;
SExplainExecInfo
*
subplanInfo
;
SExplainExecInfo
*
subplanInfo
;
}
SExplainRsp
;
int32_t
tSerializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
);
...
...
@@ -1432,12 +1433,12 @@ typedef struct SVCreateTbReq {
};
union
{
struct
{
tb_uid_t
suid
;
col_id_t
nCols
;
col_id_t
nBSmaCols
;
SSchemaEx
*
pSchema
;
col_id_t
nTagCols
;
SSchema
*
pTagSchema
;
tb_uid_t
suid
;
col_id_t
nCols
;
col_id_t
nBSmaCols
;
SSchemaEx
*
pSchema
;
col_id_t
nTagCols
;
SSchema
*
pTagSchema
;
SRSmaParam
*
pRSmaParam
;
}
stbCfg
;
struct
{
...
...
@@ -1445,9 +1446,9 @@ typedef struct SVCreateTbReq {
SKVRow
pTag
;
}
ctbCfg
;
struct
{
col_id_t
nCols
;
col_id_t
nBSmaCols
;
SSchemaEx
*
pSchema
;
col_id_t
nCols
;
col_id_t
nBSmaCols
;
SSchemaEx
*
pSchema
;
SRSmaParam
*
pRSmaParam
;
}
ntbCfg
;
};
...
...
@@ -1866,6 +1867,37 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return
buf
;
}
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int32_t
epoch
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
}
SMqCancelConnReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqCancelConnReq
(
void
**
buf
,
const
SMqCancelConnReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topicName
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqCancelConnReq
(
void
*
buf
,
SMqCancelConnReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
topicName
);
return
buf
;
}
typedef
struct
{
int8_t
reserved
;
}
SMqCancelConnRsp
;
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
...
...
include/common/tmsgdef.h
浏览文件 @
4d26b416
...
...
@@ -176,6 +176,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_DISCONNECT
,
"vnode-mq-disconnect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CONN
,
"vnode-mq-set-conn"
,
SMqSetCVgReq
,
SMqSetCVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_REB
,
"vnode-mq-mv-rebalance"
,
SMqMVRebReq
,
SMqMVRebRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CANCEL_CONN
,
"vnode-mq-mv-cancel-conn"
,
SMqCancelConnReq
,
SMqCancelConnRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CUR
,
"vnode-mq-set-cur"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_RES_READY
,
"vnode-res-ready"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASKS_STATUS
,
"vnode-tasks-status"
,
NULL
,
NULL
)
...
...
source/dnode/mgmt/mm/mmHandle.c
浏览文件 @
4d26b416
...
...
@@ -150,6 +150,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_REB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_CANCEL_CONN_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mgmt/vm/vmHandle.c
浏览文件 @
4d26b416
...
...
@@ -271,6 +271,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_SHOW_TABLES_FETCH
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_REB
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_CANCEL_CONN
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CUR
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
4d26b416
...
...
@@ -61,6 +61,7 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
...
...
@@ -74,6 +75,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_SET_CONN_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_REB_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_CANCEL_CONN_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_TIMER
,
mndProcessMqTimerMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_SUB_EP
,
mndProcessGetSubEpReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessDoRebalanceMsg
);
...
...
@@ -154,11 +156,14 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
return
0
;
}
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
)
{
SMq
SetCVg
Req
req
=
{
0
};
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
SMq
CancelConn
Req
req
=
{
0
};
req
.
consumerId
=
pConsumerEp
->
consumerId
;
req
.
vgId
=
pConsumerEp
->
vgId
;
req
.
epoch
=
pConsumerEp
->
epoch
;
strcpy
(
req
.
topicName
,
oldTopicName
);
int32_t
tlen
=
tEncodeSMq
SetCVg
Req
(
NULL
,
&
req
);
int32_t
tlen
=
tEncodeSMq
CancelConn
Req
(
NULL
,
&
req
);
void
*
buf
=
taosMemoryMalloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -169,16 +174,16 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pConsumerEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMq
SetCVg
Req
(
&
abuf
,
&
req
);
tEncodeSMq
CancelConn
Req
(
&
abuf
,
&
req
);
*
pBuf
=
buf
;
*
pLen
=
tlen
;
return
0
;
}
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
)
{
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildCancelConnReq
(
&
buf
,
&
tlen
,
pConsumerEp
)
<
0
)
{
if
(
mndBuildCancelConnReq
(
&
buf
,
&
tlen
,
pConsumerEp
,
oldTopicName
)
<
0
)
{
return
-
1
;
}
...
...
@@ -189,7 +194,7 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tlen
;
action
.
msgType
=
TDMT_VND_MQ_
SET
_CONN
;
action
.
msgType
=
TDMT_VND_MQ_
CANCEL
_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
...
...
@@ -365,7 +370,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
if
(
status
==
MQ_CONSUMER_STATUS__MODIFY
)
{
int32_t
removeSz
=
taosArrayGetSize
(
pConsumer
->
recentRemovedTopics
);
for
(
int32_t
i
=
0
;
i
<
removeSz
;
i
++
)
{
char
*
topicName
=
taosArrayGet
(
pConsumer
->
recentRemovedTopics
,
i
);
char
*
topicName
=
taosArrayGet
P
(
pConsumer
->
recentRemovedTopics
,
i
);
taosMemoryFree
(
topicName
);
}
taosArrayClear
(
pConsumer
->
recentRemovedTopics
);
...
...
@@ -797,7 +802,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
int32_t
vgsz
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
for
(
int32_t
vgi
=
0
;
vgi
<
vgsz
;
vgi
++
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayGet
(
pSubConsumer
->
vgInfo
,
vgi
);
mndPersistCancelConnReq
(
pMnode
,
pTrans
,
pConsumerEp
);
mndPersistCancelConnReq
(
pMnode
,
pTrans
,
pConsumerEp
,
oldTopicName
);
taosArrayPush
(
pSub
->
unassignedVg
,
pConsumerEp
);
}
taosArrayRemove
(
pSub
->
consumers
,
ci
);
...
...
@@ -859,7 +864,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
}
}
if
(
oldSub
)
taosArrayDestroyEx
(
oldSub
,
(
void
(
*
)(
void
*
))
taosMemoryFree
);
/*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
4d26b416
...
...
@@ -197,6 +197,7 @@ int tqCommit(STQ*);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4d26b416
...
...
@@ -541,6 +541,11 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return
0
;
}
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
)
{
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
return
0
;
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
4d26b416
...
...
@@ -191,6 +191,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_MQ_CANCEL_CONN
:
{
if
(
tqProcessCancelConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
...
...
tests/test/c/tmqSim.c
浏览文件 @
4d26b416
...
...
@@ -274,6 +274,10 @@ int main(int32_t argc, char *argv[]) {
loop_consume
(
tmq
);
err
=
tmq_unsubscribe
(
tmq
);
ASSERT
(
err
==
TMQ_RESP_ERR__SUCCESS
);
#if 0
err = tmq_unsubscribe(tmq);
if (err) {
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录