Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
51f46e40
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
51f46e40
编写于
2月 16, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
serialize drop topic req
上级
67037989
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
68 addition
and
42 deletion
+68
-42
include/common/tmsg.h
include/common/tmsg.h
+4
-10
source/common/src/tmsg.c
source/common/src/tmsg.c
+27
-0
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+37
-32
未找到文件。
include/common/tmsg.h
浏览文件 @
51f46e40
...
...
@@ -1313,19 +1313,13 @@ typedef struct {
int64_t
status
;
}
SMVSubscribeRsp
;
typedef
struct
{
char
name
[
TSDB_TOPIC_NAME_LEN
];
int8_t
igExists
;
int32_t
execLen
;
void
*
executor
;
int32_t
sqlLen
;
char
*
sql
;
}
SCreateTopicReq
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igNotExists
;
}
SDropTopicReq
;
}
SMDropTopicReq
;
int32_t
tSerializeSMDropTopicReqq
(
void
*
buf
,
int32_t
bufLen
,
SMDropTopicReq
*
pReq
);
int32_t
tDeserializeSMDropTopicReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropTopicReq
*
pReq
);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
...
source/common/src/tmsg.c
浏览文件 @
51f46e40
...
...
@@ -1788,3 +1788,30 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSMDropTopicReqq
(
void
*
buf
,
int32_t
bufLen
,
SMDropTopicReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMDropTopicReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropTopicReq
*
pReq
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
51f46e40
...
...
@@ -31,12 +31,12 @@
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
,
SMqTopicObj
*
pNewTopic
);
static
int32_t
mndProcessCreateTopic
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropTopic
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mndProcessCreateTopic
Req
(
SMnodeMsg
*
pReq
);
static
int32_t
mndProcessDropTopic
Req
(
SMnodeMsg
*
pReq
);
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
p
Rsp
);
static
int32_t
mndProcessTopicMetaMsg
(
SMnodeMsg
*
pReq
);
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
p
Msg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
p
Msg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
p
Req
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
p
Req
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitTopic
(
SMnode
*
pMnode
)
{
...
...
@@ -48,8 +48,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
.
updateFp
=
(
SdbUpdateFp
)
mndTopicActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndTopicActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_TOPIC
,
mndProcessCreateTopic
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_TOPIC
,
mndProcessDropTopic
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_TOPIC
,
mndProcessCreateTopic
Req
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_TOPIC
,
mndProcessDropTopic
Req
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TOPIC_RSP
,
mndProcessDropTopicInRsp
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
...
...
@@ -230,7 +230,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) {
return
0
;
}
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
p
Msg
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
)
{
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
p
Req
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
)
{
mDebug
(
"topic:%s to create"
,
pCreate
->
name
);
SMqTopicObj
topicObj
=
{
0
};
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_TOPIC_FNAME_LEN
);
...
...
@@ -248,7 +248,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw
*
pTopicRaw
=
mndTopicActionEncode
(
&
topicObj
);
if
(
pTopicRaw
==
NULL
)
return
-
1
;
if
(
sdbSetRawStatus
(
pTopicRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &p
Msg
->rpcMsg);*/
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &p
Req
->rpcMsg);*/
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
...
...
@@ -260,9 +260,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
return
sdbWrite
(
pMnode
->
pSdb
,
pTopicRaw
);
}
static
int32_t
mndProcessCreateTopic
Msg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
p
Msg
->
pMnode
;
char
*
msgStr
=
p
Msg
->
rpcMsg
.
pCont
;
static
int32_t
mndProcessCreateTopic
Req
(
SMnodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
p
Req
->
pMnode
;
char
*
msgStr
=
p
Req
->
rpcMsg
.
pCont
;
SCMCreateTopicReq
createTopicReq
=
{
0
};
tDeserializeSCMCreateTopicReq
(
msgStr
,
&
createTopicReq
);
...
...
@@ -294,7 +294,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return
-
1
;
}
int32_t
code
=
mndCreateTopic
(
pMnode
,
p
Msg
,
&
createTopicReq
,
pDb
);
int32_t
code
=
mndCreateTopic
(
pMnode
,
p
Req
,
&
createTopicReq
,
pDb
);
mndReleaseDb
(
pMnode
,
pDb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -306,40 +306,45 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return
0
;
}
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
p
Msg
,
SMqTopicObj
*
pTopic
)
{
return
0
;
}
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
p
Req
,
SMqTopicObj
*
pTopic
)
{
return
0
;
}
static
int32_t
mndProcessDropTopic
Msg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
p
Msg
->
pMnode
;
S
DropTopicReq
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
static
int32_t
mndProcessDropTopic
Req
(
SMnodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
p
Req
->
pMnode
;
S
MDropTopicReq
dropReq
=
{
0
}
;
mDebug
(
"topic:%s, start to drop"
,
pDrop
->
name
);
if
(
tDeserializeSMDropTopicReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
mDebug
(
"topic:%s, start to drop"
,
dropReq
.
name
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
pDrop
->
name
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
dropReq
.
name
);
if
(
pTopic
==
NULL
)
{
if
(
pDrop
->
igNotExists
)
{
mDebug
(
"topic:%s, not exist, ignore not exist is set"
,
pDrop
->
name
);
if
(
dropReq
.
igNotExists
)
{
mDebug
(
"topic:%s, not exist, ignore not exist is set"
,
dropReq
.
name
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
mError
(
"topic:%s, failed to drop since %s"
,
pDrop
->
name
,
terrstr
());
mError
(
"topic:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
return
-
1
;
}
}
int32_t
code
=
mndDropTopic
(
pMnode
,
p
Msg
,
pTopic
);
int32_t
code
=
mndDropTopic
(
pMnode
,
p
Req
,
pTopic
);
mndReleaseTopic
(
pMnode
,
pTopic
);
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"topic:%s, failed to drop since %s"
,
pDrop
->
name
,
terrstr
());
mError
(
"topic:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
p
Msg
)
{
mndTransProcessRsp
(
p
Msg
);
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
p
Rsp
)
{
mndTransProcessRsp
(
p
Rsp
);
return
0
;
}
...
...
@@ -405,8 +410,8 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) {
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
p
Msg
->pCont = pMeta;
p
Msg
->contLen = contLen;
p
Req
->pCont = pMeta;
p
Req
->contLen = contLen;
mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif
...
...
@@ -438,8 +443,8 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return
0
;
}
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
p
Msg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
)
{
SMnode
*
pMnode
=
p
Msg
->
pMnode
;
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
p
Req
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
)
{
SMnode
*
pMnode
=
p
Req
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
if
(
mndGetNumOfTopics
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
...
...
@@ -501,8 +506,8 @@ static void mndExtractTableName(char *tableId, char *name) {
}
}
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
p
Msg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
p
Msg
->
pMnode
;
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
p
Req
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
p
Req
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SMqTopicObj
*
pTopic
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录