Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2e640c38
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2e640c38
编写于
10月 31, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: stream and tmq message
上级
d058475f
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
234 addition
and
250 deletion
+234
-250
include/common/tmsg.h
include/common/tmsg.h
+2
-1
include/common/tmsgdef.h
include/common/tmsgdef.h
+25
-27
include/common/tname.h
include/common/tname.h
+5
-5
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-2
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+5
-5
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+25
-11
source/common/src/tname.c
source/common/src/tname.c
+4
-4
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+11
-13
source/dnode/mgmt/mgmt_snode/src/smHandle.c
source/dnode/mgmt/mgmt_snode/src/smHandle.c
+0
-2
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+7
-8
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+9
-9
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+1
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+0
-72
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+7
-7
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+6
-7
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+2
-2
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+36
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+8
-4
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+40
-43
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+23
-19
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+3
-3
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+10
-4
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+2
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
2e640c38
...
...
@@ -57,7 +57,8 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \
(TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \
(TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
...
...
include/common/tmsgdef.h
浏览文件 @
2e640c38
...
...
@@ -138,17 +138,15 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_TABLE_INDEX
,
"get-table-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_BATCH_META
,
"batch-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TABLE_CFG
,
"table-cfg"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_TOPIC
,
"create-topic"
,
SMCreateTopicReq
,
SMCreateTopicRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_TOPIC
,
"alter-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_TOPIC
,
"drop-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SUBSCRIBE
,
"subscribe"
,
SCMSubscribeReq
,
SCMSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_ASK_EP
,
"ask-ep"
,
SMqAskEpReq
,
SMqAskEpRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_CONSUMER_LOST
,
"consumer-lost"
,
SMqConsumerLostMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_CONSUMER_RECOVER
,
"consumer-recover"
,
SMqConsumerRecoverMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_HB
,
"consumer-hb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_DO_REBALANCE
,
"do-rebalance"
,
SMqDoRebalanceMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_DROP_CGROUP
,
"drop-cgroup"
,
SMqDropCGroupReq
,
SMqDropCGroupRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_COMMIT_OFFSET
,
"mnode-commit-offset"
,
SMqCMCommitOffsetReq
,
SMqCMCommitOffsetRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_CREATE_TOPIC
,
"create-topic"
,
SMCreateTopicReq
,
SMCreateTopicRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_DROP_TOPIC
,
"drop-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_SUBSCRIBE
,
"subscribe"
,
SCMSubscribeReq
,
SCMSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_ASK_EP
,
"ask-ep"
,
SMqAskEpReq
,
SMqAskEpRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_CONSUMER_LOST
,
"consumer-lost"
,
SMqConsumerLostMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_CONSUMER_RECOVER
,
"consumer-recover"
,
SMqConsumerRecoverMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_HB
,
"consumer-hb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_DO_REBALANCE
,
"do-rebalance"
,
SMqDoRebalanceMsg
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_DROP_CGROUP
,
"drop-cgroup"
,
SMqDropCGroupReq
,
SMqDropCGroupRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_TIMER
,
"mq-tmr"
,
SMTimerReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TELEM_TIMER
,
"telem-tmr"
,
SMTimerReq
,
SMTimerReq
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TRANS_TIMER
,
"trans-tmr"
,
NULL
,
NULL
)
...
...
@@ -186,21 +184,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_STB
,
"vnode-create-stb"
,
SVCreateStbReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_STB
,
"vnode-alter-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_STB
,
"vnode-drop-stb"
,
SVDropStbReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_CHANGE
,
"vnode-mq-vg-change"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_DELETE
,
"vnode-mq-vg-delete"
,
SMqVDeleteReq
,
SMqVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_COMMIT_OFFSET
,
"vnode-commit-offset"
,
STqOffset
,
STqOffset
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ADD_CHECK_INFO
,
"vnode-add-check-info"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DELETE_CHECK_INFO
,
"vnode-delete-check-info"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_TOPIC
,
"vnode-create-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_TOPIC
,
"vnode-alter-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TOPIC
,
"vnode-drop-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqPollReq
,
SMqDataBlkRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_TRIGGER
,
"vnode-stream-trigger"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_DISPATCH_WRITE
,
"vnode-stream-task-dispatch-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_RECOVER_STEP1
,
"vnode-stream-recover1"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_RECOVER_STEP2
,
"vnode-stream-recover2"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_RECOVER_FINISH
,
"vnode-stream-finish"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
...
...
@@ -232,15 +215,30 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SCH_LINK_BROKEN
,
"link-broken"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_MAX_MSG
,
"sch-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_TMQ_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_SUBSCRIBE
,
"vnode-tmq-subscribe"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_DELETE_SUB
,
"vnode-tmq-delete-sub"
,
SMqVDeleteReq
,
SMqVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_COMMIT_OFFSET
,
"vnode-tmq-commit-offset"
,
STqOffset
,
STqOffset
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_ADD_CHECKINFO
,
"vnode-tmq-add-checkinfo"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_DEL_CHECKINFO
,
"vnode-del-checkinfo"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_CONSUME
,
"vnode-tmq-consume"
,
SMqPollReq
,
SMqDataBlkRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TMQ_MAX_MSG
,
"vnd-tmq-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_STREAM_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DEPLOY
,
"stream-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DROP
,
"stream-task-drop"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RUN
,
"stream-task-run"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DISPATCH
,
"stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RECOVER
,
"stream-task-recover"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_RETRIEVE
,
"stream-retrieve"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_RECOVER_FINISH
,
"vnode-stream-finish"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_MAX_MSG
,
"stream-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_STREAM_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_TRIGGER
,
"vnode-stream-trigger"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_RECOVER_STEP1
,
"vnode-stream-recover1"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_RECOVER_STEP2
,
"vnode-stream-recover2"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_MAX_MSG
,
"vnd-stream-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_MON_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MAX_MSG
,
"monitor-max"
,
NULL
,
NULL
)
...
...
include/common/tname.h
浏览文件 @
2e640c38
...
...
@@ -72,13 +72,13 @@ bool tNameTbNameEqual(SName* left, SName* right);
typedef
struct
{
// input
SArray
*
tags
;
// element is SSmlKv
const
char
*
s
Table
Name
;
// super table name
uint8_t
s
Table
NameLen
;
// the length of super table name
SArray
*
tags
;
// element is SSmlKv
const
char
*
s
tbFull
Name
;
// super table name
uint8_t
s
tbFull
NameLen
;
// the length of super table name
// output
char
*
c
hildTable
Name
;
// must have size of TSDB_TABLE_NAME_LEN;
uint64_t
uid
;
// child table uid, may be useful
char
*
c
tbShort
Name
;
// must have size of TSDB_TABLE_NAME_LEN;
uint64_t
uid
;
// child table uid, may be useful
}
RandTableName
;
void
buildChildTableName
(
RandTableName
*
rName
);
...
...
include/libs/stream/tstream.h
浏览文件 @
2e640c38
...
...
@@ -441,8 +441,9 @@ typedef struct {
}
SStreamRetrieveRsp
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
SMsgHead
msgHead
;
int64_t
streamId
;
int32_t
taskId
;
}
SStreamRecoverStep1Req
,
SStreamRecoverStep2Req
;
typedef
struct
{
...
...
source/client/src/clientTmq.c
浏览文件 @
2e640c38
...
...
@@ -508,7 +508,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo
->
param
=
pParam
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
fp
=
tmqCommitCb
;
pMsgSendInfo
->
msgType
=
TDMT_VND_MQ_COMMIT_OFFSET
;
pMsgSendInfo
->
msgType
=
TDMT_VND_
T
MQ_COMMIT_OFFSET
;
// send msg
atomic_add_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
...
...
@@ -750,7 +750,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
NULL
;
sendInfo
->
fp
=
tmqHbCb
;
sendInfo
->
msgType
=
TDMT_MND_MQ_HB
;
sendInfo
->
msgType
=
TDMT_MND_
T
MQ_HB
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -1038,7 +1038,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
&
param
;
sendInfo
->
fp
=
tmqSubscribeCb
;
sendInfo
->
msgType
=
TDMT_MND_SUBSCRIBE
;
sendInfo
->
msgType
=
TDMT_MND_
TMQ_
SUBSCRIBE
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -1420,7 +1420,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqAskEpCb
;
sendInfo
->
msgType
=
TDMT_MND_MQ_ASK_EP
;
sendInfo
->
msgType
=
TDMT_MND_
T
MQ_ASK_EP
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -1573,7 +1573,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqPollCb
;
sendInfo
->
msgType
=
TDMT_VND_CONSUME
;
sendInfo
->
msgType
=
TDMT_VND_
TMQ_
CONSUME
;
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
...
...
source/common/src/tdatablock.c
浏览文件 @
2e640c38
...
...
@@ -2063,8 +2063,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
default:
if
(
pColInfoData
->
info
.
type
<
TSDB_DATA_TYPE_MAX
&&
pColInfoData
->
info
.
type
>
TSDB_DATA_TYPE_NULL
)
{
if
(
colDataIsNull_s
(
pColInfoData
,
j
))
{
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
if
(
pCol
->
type
==
pColInfoData
->
info
.
type
)
{
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
...
...
@@ -2137,10 +2137,26 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
return
TSDB_CODE_SUCCESS
;
}
char
*
buildCtbNameByGroupId
(
const
char
*
stbName
,
uint64_t
groupId
)
{
ASSERT
(
stbName
[
0
]
!=
0
);
char
*
buildCtbNameByGroupId
(
const
char
*
stb
Full
Name
,
uint64_t
groupId
)
{
ASSERT
(
stb
Full
Name
[
0
]
!=
0
);
SArray
*
tags
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
tags
==
NULL
)
{
return
NULL
;
}
SSmlKv
*
pTag
=
taosMemoryCalloc
(
1
,
sizeof
(
SSmlKv
));
if
(
pTag
==
NULL
)
{
taosArrayDestroy
(
tags
);
return
NULL
;
}
void
*
cname
=
taosMemoryCalloc
(
1
,
TSDB_TABLE_NAME_LEN
+
1
);
if
(
cname
==
NULL
)
{
taosArrayDestroy
(
tags
);
taosMemoryFree
(
pTag
);
return
NULL
;
}
pTag
->
key
=
"group_id"
;
pTag
->
keyLen
=
strlen
(
pTag
->
key
);
pTag
->
type
=
TSDB_DATA_TYPE_UBIGINT
;
...
...
@@ -2148,13 +2164,11 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
pTag
->
length
=
sizeof
(
uint64_t
);
taosArrayPush
(
tags
,
&
pTag
);
void
*
cname
=
taosMemoryCalloc
(
1
,
TSDB_TABLE_NAME_LEN
+
1
);
RandTableName
rname
=
{
.
tags
=
tags
,
.
s
TableName
=
stb
Name
,
.
s
TableNameLen
=
strlen
(
stb
Name
),
.
c
hildTable
Name
=
cname
,
.
s
tbFullName
=
stbFull
Name
,
.
s
tbFullNameLen
=
strlen
(
stbFull
Name
),
.
c
tbShort
Name
=
cname
,
};
buildChildTableName
(
&
rname
);
...
...
@@ -2162,8 +2176,8 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
taosMemoryFree
(
pTag
);
taosArrayDestroy
(
tags
);
ASSERT
(
rname
.
c
hildTableName
&&
rname
.
childTable
Name
[
0
]);
return
rname
.
c
hildTable
Name
;
ASSERT
(
rname
.
c
tbShortName
&&
rname
.
ctbShort
Name
[
0
]);
return
rname
.
c
tbShort
Name
;
}
void
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
)
{
...
...
source/common/src/tname.c
浏览文件 @
2e640c38
...
...
@@ -315,7 +315,7 @@ static int compareKv(const void* p1, const void* p2) {
*/
void
buildChildTableName
(
RandTableName
*
rName
)
{
SStringBuilder
sb
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb
,
rName
->
s
TableName
,
rName
->
sTable
NameLen
);
taosStringBuilderAppendStringLen
(
&
sb
,
rName
->
s
tbFullName
,
rName
->
stbFull
NameLen
);
taosArraySort
(
rName
->
tags
,
compareKv
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
rName
->
tags
);
++
j
)
{
taosStringBuilderAppendChar
(
&
sb
,
','
);
...
...
@@ -336,11 +336,11 @@ void buildChildTableName(RandTableName* rName) {
tMD5Final
(
&
context
);
char
temp
[
8
]
=
{
0
};
rName
->
c
hildTable
Name
[
0
]
=
't'
;
rName
->
c
hildTable
Name
[
1
]
=
'_'
;
rName
->
c
tbShort
Name
[
0
]
=
't'
;
rName
->
c
tbShort
Name
[
1
]
=
'_'
;
for
(
int
i
=
0
;
i
<
16
;
i
++
)
{
sprintf
(
temp
,
"%02x"
,
context
.
digest
[
i
]);
strcat
(
rName
->
c
hildTable
Name
,
temp
);
strcat
(
rName
->
c
tbShort
Name
,
temp
);
}
taosStringBuilderDestroy
(
&
sb
);
rName
->
uid
=
*
(
uint64_t
*
)(
context
.
digest
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
2e640c38
...
...
@@ -141,15 +141,13 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_STREAM
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_INDEX
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_TABLE_INDEX
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SUBSCRIBE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_ASK_EP
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_HB
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_COMMIT_OFFSET
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_CREATE_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_SUBSCRIBE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_ASK_EP
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_HB
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_CGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TMQ_DROP_CGROUP_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_TRANS
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_QUERY
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_CONN
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
@@ -171,10 +169,10 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_STB_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_SMA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_SMA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
MQ_VG_CHANG
E_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
MQ_VG_DELETE
_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
ADD_CHECK_
INFO_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
DELETE_CHECK_
INFO_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_SUBSCRIB
E_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_DELETE_SUB
_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_ADD_CHECK
INFO_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_DEL_CHECK
INFO_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_DROP_TASK
,
mmPutMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DROP_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_snode/src/smHandle.c
浏览文件 @
2e640c38
...
...
@@ -72,8 +72,6 @@ SArray *smGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RUN
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH_RSP
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER_RSP
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_RETRIEVE
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_RETRIEVE_RSP
,
smPutNodeMsgToStreamQueue
,
1
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
2e640c38
...
...
@@ -392,12 +392,12 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CANCEL_SMA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_SMA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SUBMIT_RSMA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
MQ_VG_CHANG
E
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
MQ_VG_DELETE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_COMMIT_OFFSET
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
ADD_CHECK_
INFO
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
DELETE_CHECK_
INFO
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CONSUME
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_SUBSCRIB
E
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_DELETE_SUB
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
T
MQ_COMMIT_OFFSET
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_ADD_CHECK
INFO
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_DEL_CHECK
INFO
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TMQ_
CONSUME
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DELETE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_BATCH_DEL
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMMIT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
@@ -409,10 +409,9 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RUN
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH_RSP
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER_RSP
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_RETRIEVE
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_RETRIEVE_RSP
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_RECOVER_FINISH
,
vmPutMsgToStreamQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
2e640c38
...
...
@@ -63,12 +63,12 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.
deleteFp
=
(
SdbDeleteFp
)
mndConsumerActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_HB
,
mndProcessMqHbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_ASK_EP
,
mndProcessAskEpReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
TMQ_
SUBSCRIBE
,
mndProcessSubscribeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_HB
,
mndProcessMqHbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_ASK_EP
,
mndProcessAskEpReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_TIMER
,
mndProcessMqTimerMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_CONSUMER_LOST
,
mndProcessConsumerLostMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_CONSUMER_RECOVER
,
mndProcessConsumerRecoverMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_CONSUMER_LOST
,
mndProcessConsumerLostMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_CONSUMER_RECOVER
,
mndProcessConsumerRecoverMsg
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_CONSUMERS
,
mndRetrieveConsumer
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_CONSUMERS
,
mndCancelGetNextConsumer
);
...
...
@@ -207,7 +207,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
pLostMsg
->
consumerId
=
pConsumer
->
consumerId
;
SRpcMsg
pRpcMsg
=
{
.
msgType
=
TDMT_MND_MQ_CONSUMER_LOST
,
.
msgType
=
TDMT_MND_
T
MQ_CONSUMER_LOST
,
.
pCont
=
pLostMsg
,
.
contLen
=
sizeof
(
SMqConsumerLostMsg
),
};
...
...
@@ -256,7 +256,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if
(
taosHashGetSize
(
pRebMsg
->
rebSubHash
)
!=
0
)
{
mInfo
(
"mq rebalance will be triggered"
);
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_MQ_DO_REBALANCE
,
.
msgType
=
TDMT_MND_
T
MQ_DO_REBALANCE
,
.
pCont
=
pRebMsg
,
.
contLen
=
sizeof
(
SMqDoRebalanceMsg
),
};
...
...
@@ -292,7 +292,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
pRecoverMsg
->
consumerId
=
consumerId
;
SRpcMsg
pRpcMsg
=
{
.
msgType
=
TDMT_MND_MQ_CONSUMER_RECOVER
,
.
msgType
=
TDMT_MND_
T
MQ_CONSUMER_RECOVER
,
.
pCont
=
pRecoverMsg
,
.
contLen
=
sizeof
(
SMqConsumerRecoverMsg
),
};
...
...
@@ -331,7 +331,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
pRecoverMsg
->
consumerId
=
consumerId
;
SRpcMsg
pRpcMsg
=
{
.
msgType
=
TDMT_MND_MQ_CONSUMER_RECOVER
,
.
msgType
=
TDMT_MND_
T
MQ_CONSUMER_RECOVER
,
.
pCont
=
pRecoverMsg
,
.
contLen
=
sizeof
(
SMqConsumerRecoverMsg
),
};
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
2e640c38
...
...
@@ -487,6 +487,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
pTask
->
fillHistory
=
pStream
->
fillHistory
;
mndAddTaskToTaskSet
(
taskOneLevel
,
pTask
);
// source
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
2e640c38
...
...
@@ -557,78 +557,6 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return
0
;
}
#if 0
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
int32_t len;
int32_t code;
tEncodeSize(tEncodeSMStreamTaskRecoverReq, pReq, len, code);
if (code != 0) {
return -1;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSMStreamTaskRecoverReq(&encoder, pReq);
((SMsgHead *)buf)->vgId = pTask->nodeId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_STREAM_TASK_RECOVER;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
SStreamTask *pTask = taosArrayGetP(pTasks, 0);
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(sz == 1);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
}
} else {
continue;
}
}
} else {
int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SOURCE) break;
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
}
}
}
}
return 0;
}
#endif
int32_t
mndDropStreamTasks
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
int32_t
lv
=
taosArrayGetSize
(
pStream
->
tasks
);
for
(
int32_t
i
=
0
;
i
<
lv
;
i
++
)
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
2e640c38
...
...
@@ -73,11 +73,11 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
MQ_VG_CHANG
E_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
MQ_VG_DELETE
_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessRebalanceReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DROP_CGROUP
,
mndProcessDropCgroupReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
TMQ_SUBSCRIB
E_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
TMQ_DELETE_SUB
_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_DO_REBALANCE
,
mndProcessRebalanceReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_DROP_CGROUP
,
mndProcessDropCgroupReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
T
MQ_DROP_CGROUP_RSP
,
mndTransProcessRsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_SUBSCRIPTIONS
,
mndRetrieveSubscribe
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextSubscribe
);
...
...
@@ -164,7 +164,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_
MQ_VG_CHANG
E
;
action
.
msgType
=
TDMT_VND_
TMQ_SUBSCRIB
E
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
...
...
@@ -920,7 +920,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
action
.
epSet
=
pVgEp
->
epSet
;
action
.
pCont
=
pReq
;
action
.
contLen
=
sizeof
(
SMqVDeleteReq
);
action
.
msgType
=
TDMT_VND_
MQ_VG_DELETE
;
action
.
msgType
=
TDMT_VND_
TMQ_DELETE_SUB
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
2e640c38
...
...
@@ -53,11 +53,10 @@ int32_t mndInitTopic(SMnode *pMnode) {
.
deleteFp
=
(
SdbDeleteFp
)
mndTopicActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_TOPIC
,
mndProcessCreateTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_TOPIC
,
mndProcessDropTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TOPIC_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ADD_CHECK_INFO_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DELETE_CHECK_INFO_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TMQ_CREATE_TOPIC
,
mndProcessCreateTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TMQ_DROP_TOPIC
,
mndProcessDropTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_TMQ_ADD_CHECKINFO_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_TMQ_DEL_CHECKINFO_RSP
,
mndTransProcessRsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndRetrieveTopic
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextTopic
);
...
...
@@ -506,7 +505,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
len
;
action
.
msgType
=
TDMT_VND_
ADD_CHECK_
INFO
;
action
.
msgType
=
TDMT_VND_
TMQ_ADD_CHECK
INFO
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
sdbRelease
(
pSdb
,
pVgroup
);
...
...
@@ -715,7 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
buf
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
TSDB_TOPIC_FNAME_LEN
;
action
.
msgType
=
TDMT_VND_
DELETE_CHECK_
INFO
;
action
.
msgType
=
TDMT_VND_
TMQ_DEL_CHECK
INFO
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
sdbRelease
(
pSdb
,
pVgroup
);
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
2e640c38
...
...
@@ -90,12 +90,12 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
code
=
qWorkerProcessFetchMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
,
ts
);
break
;
case
TDMT_SCH_CANCEL_TASK
:
//code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
//
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break
;
case
TDMT_SCH_DROP_TASK
:
code
=
qWorkerProcessDropMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
,
ts
);
break
;
case
TDMT_VND_CONSUME
:
case
TDMT_VND_
TMQ_
CONSUME
:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case
TDMT_SCH_QUERY_HEARTBEAT
:
...
...
source/dnode/snode/src/snode.c
浏览文件 @
2e640c38
...
...
@@ -261,18 +261,52 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return
0
;
}
int32_t
sndProcessTaskRecoverFinishReq
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
char
*
msg
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
// deserialize
SStreamRecoverFinishReq
req
;
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
msg
,
msgLen
);
tDecodeSStreamRecoverFinishReq
(
&
decoder
,
&
req
);
tDecoderClear
(
&
decoder
);
// find task
SStreamTask
*
pTask
=
streamMetaGetTask
(
pSnode
->
pMeta
,
req
.
taskId
);
if
(
pTask
==
NULL
)
{
return
-
1
;
}
// do process request
if
(
streamProcessRecoverFinishReq
(
pTask
,
req
.
childId
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
sndProcessTaskRecoverFinishRsp
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
//
return
0
;
}
int32_t
sndProcessStreamMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
switch
(
pMsg
->
msgType
)
{
case
TDMT_STREAM_TASK_RUN
:
return
sndProcessTaskRunReq
(
pSnode
,
pMsg
);
case
TDMT_STREAM_TASK_DISPATCH
:
return
sndProcessTaskDispatchReq
(
pSnode
,
pMsg
,
true
);
case
TDMT_STREAM_RETRIEVE
:
return
sndProcessTaskRetrieveReq
(
pSnode
,
pMsg
);
case
TDMT_STREAM_TASK_DISPATCH_RSP
:
return
sndProcessTaskDispatchRsp
(
pSnode
,
pMsg
);
case
TDMT_STREAM_RETRIEVE
:
return
sndProcessTaskRetrieveReq
(
pSnode
,
pMsg
);
case
TDMT_STREAM_RETRIEVE_RSP
:
return
sndProcessTaskRetrieveRsp
(
pSnode
,
pMsg
);
case
TDMT_STREAM_RECOVER_FINISH
:
return
sndProcessTaskRecoverFinishReq
(
pSnode
,
pMsg
);
case
TDMT_STREAM_RECOVER_FINISH_RSP
:
return
sndProcessTaskRecoverFinishRsp
(
pSnode
,
pMsg
);
default:
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
2e640c38
...
...
@@ -179,8 +179,8 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
// tq-mq
int32_t
tqProcessAddCheckInfoReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessDelCheckInfoReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcess
VgChang
eReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcess
VgDelete
Req
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcess
Subscrib
eReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcess
DeleteSub
Req
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessOffsetCommitReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
// tq-stream
...
...
@@ -190,11 +190,15 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t
tqProcessDelReq
(
STQ
*
pTq
,
void
*
pReq
,
int32_t
len
,
int64_t
ver
);
int32_t
tqProcessTaskRunReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
bool
exec
);
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
// int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
// int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t
tqProcessTaskRetrieveReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRetrieveRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecover1Req
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecover2Req
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskRecoverFinishReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverFinishRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
SSubmitReq
*
tqBlockToSubmit
(
SVnode
*
pVnode
,
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
,
SSchemaWrapper
*
pTagSchemaWrapper
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
2e640c38
...
...
@@ -709,7 +709,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return
0
;
}
int32_t
tqProcess
VgDelete
Req
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
tqProcess
DeleteSub
Req
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
SMqVDeleteReq
*
pReq
=
(
SMqVDeleteReq
*
)
msg
;
taosWLockLatch
(
&
pTq
->
pushLock
);
...
...
@@ -767,7 +767,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
return
0
;
}
int32_t
tqProcess
VgChang
eReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
tqProcess
Subscrib
eReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
SMqRebVgReq
req
=
{
0
};
tDecodeSMqRebVgReq
(
msg
,
&
req
);
// todo lock
...
...
@@ -982,25 +982,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
// 3.go through recover steps to fill history
if
(
pTask
->
fillHistory
)
{
streamSetParamForRecover
(
pTask
);
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
streamSetParamForRecover
(
pTask
);
streamSourceRecoverPrepareStep1
(
pTask
,
version
);
SStreamRecoverStep1Req
req
;
streamBuildSourceRecover1Req
(
pTask
,
&
req
);
void
*
serialziedReq
=
(
void
*
)
&
req
;
int32_t
len
=
sizeof
(
SStreamRecoverStep1Req
);
void
*
serializedReq
=
rpcMallocCont
(
len
);
if
(
serializedReq
==
NULL
)
{
return
-
1
;
}
memcpy
(
serializedReq
,
&
req
,
len
);
SRpcMsg
rpcMsg
=
{
.
contLen
=
len
,
.
pCont
=
serial
zi
edReq
,
.
pCont
=
serial
iz
edReq
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_STEP1
,
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
STREAM_QUEUE
,
&
rpcMsg
);
if
(
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
STREAM_QUEUE
,
&
rpcMsg
)
<
0
)
{
/*ASSERT(0);*/
}
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
streamSetParamForRecover
(
pTask
);
streamAggRecoverPrepare
(
pTask
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
)
{
// do nothing
...
...
@@ -1010,8 +1018,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
return
0
;
}
int32_t
tqProcessTaskRecover1Req
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
code
;
int32_t
tqProcessTaskRecover1Req
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
int32_t
code
;
char
*
msg
=
pMsg
->
pCont
;
int32_t
msgLen
=
pMsg
->
contLen
;
SStreamRecoverStep1Req
*
pReq
=
(
SStreamRecoverStep1Req
*
)
msg
;
SStreamTask
*
pTask
=
streamMetaGetTask
(
pTq
->
pStreamMeta
,
pReq
->
taskId
);
if
(
pTask
==
NULL
)
{
...
...
@@ -1035,16 +1046,24 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, char* msg, int32_t msgLen) {
return
-
1
;
}
ASSERT
(
pReq
->
taskId
==
pTask
->
taskId
);
// serialize msg
int32_t
len
=
sizeof
(
SStreamRecoverStep2Req
);
void
*
serializedReq
=
(
void
*
)
&
req
;
int32_t
len
=
sizeof
(
SStreamRecoverStep1Req
);
void
*
serializedReq
=
rpcMallocCont
(
len
);
if
(
serializedReq
==
NULL
)
{
return
-
1
;
}
memcpy
(
serializedReq
,
&
req
,
len
);
// dispatch msg
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
contLen
=
len
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_STEP2
,
.
pCont
=
(
void
*
)
serializedReq
,
.
pCont
=
serializedReq
,
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
rpcMsg
);
...
...
@@ -1087,15 +1106,15 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
return
0
;
}
int32_t
tqProcessTaskRecoverFinishReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
code
;
int32_t
tqProcessTaskRecoverFinishReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
char
*
msg
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
// deserialize
int32_t
len
;
SStreamRecoverFinishReq
req
;
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
msg
,
sizeof
(
SStreamRecoverFinishReq
)
);
tDecoderInit
(
&
decoder
,
msg
,
msgLen
);
tDecodeSStreamRecoverFinishReq
(
&
decoder
,
&
req
);
tDecoderClear
(
&
decoder
);
...
...
@@ -1112,6 +1131,11 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) {
return
0
;
}
int32_t
tqProcessTaskRecoverFinishRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
//
return
0
;
}
int32_t
tqProcessDelReq
(
STQ
*
pTq
,
void
*
pReq
,
int32_t
len
,
int64_t
ver
)
{
bool
failed
=
false
;
SDecoder
*
pCoder
=
&
(
SDecoder
){
0
};
...
...
@@ -1306,33 +1330,6 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
}
}
#if 0
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRecoverReq(pTask, pReq, pMsg);
return 0;
} else {
return -1;
}
}
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
}
#endif
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamDispatchRsp
*
pRsp
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
taskId
=
pRsp
->
taskId
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
2e640c38
...
...
@@ -228,30 +228,30 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if
(
vnodeProcessBatchDeleteReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
break
;
/* TQ */
case
TDMT_VND_
MQ_VG_CHANG
E
:
if
(
tqProcess
VgChang
eReq
(
pVnode
->
pTq
,
version
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
case
TDMT_VND_
TMQ_SUBSCRIB
E
:
if
(
tqProcess
Subscrib
eReq
(
pVnode
->
pTq
,
version
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
goto
_err
;
}
break
;
case
TDMT_VND_
MQ_VG_DELETE
:
if
(
tqProcess
VgDelete
Req
(
pVnode
->
pTq
,
version
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
case
TDMT_VND_
TMQ_DELETE_SUB
:
if
(
tqProcess
DeleteSub
Req
(
pVnode
->
pTq
,
version
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
goto
_err
;
}
break
;
case
TDMT_VND_MQ_COMMIT_OFFSET
:
case
TDMT_VND_
T
MQ_COMMIT_OFFSET
:
if
(
tqProcessOffsetCommitReq
(
pVnode
->
pTq
,
version
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
goto
_err
;
}
break
;
case
TDMT_VND_
ADD_CHECK_
INFO
:
case
TDMT_VND_
TMQ_ADD_CHECK
INFO
:
if
(
tqProcessAddCheckInfoReq
(
pVnode
->
pTq
,
version
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
goto
_err
;
}
break
;
case
TDMT_VND_
DELETE_CHECK_
INFO
:
case
TDMT_VND_
TMQ_DEL_CHECK
INFO
:
if
(
tqProcessDelCheckInfoReq
(
pVnode
->
pTq
,
version
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
goto
_err
;
...
...
@@ -268,6 +268,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto
_err
;
}
}
break
;
case
TDMT_VND_STREAM_RECOVER_STEP2
:
{
if
(
tqProcessTaskRecover2Req
(
pVnode
->
pTq
,
version
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
goto
_err
;
}
}
break
;
case
TDMT_VND_ALTER_CONFIRM
:
vnodeProcessAlterConfirmReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
);
break
;
...
...
@@ -355,14 +360,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
0
;
}
if
(
pMsg
->
msgType
==
TDMT_VND_CONSUME
&&
!
pVnode
->
restored
)
{
if
(
pMsg
->
msgType
==
TDMT_VND_
TMQ_
CONSUME
&&
!
pVnode
->
restored
)
{
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
);
return
0
;
}
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_SCH_FETCH
:
case
TDMT_SCH_MERGE_FETCH
:
...
...
@@ -381,7 +383,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
vnodeGetTableCfg
(
pVnode
,
pMsg
,
true
);
case
TDMT_VND_BATCH_META
:
return
vnodeGetBatchMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
case
TDMT_VND_
TMQ_
CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_RUN
:
return
tqProcessTaskRunReq
(
pVnode
->
pTq
,
pMsg
);
...
...
@@ -389,16 +391,18 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
#endif
/*case TDMT_STREAM_TASK_RECOVER:*/
/*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
case
TDMT_STREAM_RETRIEVE
:
return
tqProcessTaskRetrieveReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_DISPATCH_RSP
:
return
tqProcessTaskDispatchRsp
(
pVnode
->
pTq
,
pMsg
);
/*case TDMT_STREAM_TASK_RECOVER_RSP:*/
/*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/
case
TDMT_STREAM_RETRIEVE
:
return
tqProcessTaskRetrieveReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_RETRIEVE_RSP
:
return
tqProcessTaskRetrieveRsp
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_STREAM_RECOVER_STEP1
:
return
tqProcessTaskRecover1Req
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_RECOVER_FINISH
:
return
tqProcessTaskRecoverFinishReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_RECOVER_FINISH_RSP
:
return
tqProcessTaskRecoverFinishRsp
(
pVnode
->
pTq
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
2e640c38
...
...
@@ -5279,7 +5279,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
code
=
buildCreateTopicReq
(
pCxt
,
pStmt
,
&
createReq
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_CREATE_TOPIC
,
(
FSerializeFunc
)
tSerializeSCMCreateTopicReq
,
&
createReq
);
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_
TMQ_
CREATE_TOPIC
,
(
FSerializeFunc
)
tSerializeSCMCreateTopicReq
,
&
createReq
);
}
tFreeSCMCreateTopicReq
(
&
createReq
);
return
code
;
...
...
@@ -5293,7 +5293,7 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
tNameGetFullDbName
(
&
name
,
dropReq
.
name
);
dropReq
.
igNotExists
=
pStmt
->
ignoreNotExists
;
return
buildCmdMsg
(
pCxt
,
TDMT_MND_DROP_TOPIC
,
(
FSerializeFunc
)
tSerializeSMDropTopicReq
,
&
dropReq
);
return
buildCmdMsg
(
pCxt
,
TDMT_MND_
TMQ_
DROP_TOPIC
,
(
FSerializeFunc
)
tSerializeSMDropTopicReq
,
&
dropReq
);
}
static
int32_t
translateDropCGroup
(
STranslateContext
*
pCxt
,
SDropCGroupStmt
*
pStmt
)
{
...
...
@@ -5305,7 +5305,7 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt
dropReq
.
igNotExists
=
pStmt
->
ignoreNotExists
;
strcpy
(
dropReq
.
cgroup
,
pStmt
->
cgroup
);
return
buildCmdMsg
(
pCxt
,
TDMT_MND_MQ_DROP_CGROUP
,
(
FSerializeFunc
)
tSerializeSMDropCgroupReq
,
&
dropReq
);
return
buildCmdMsg
(
pCxt
,
TDMT_MND_
T
MQ_DROP_CGROUP
,
(
FSerializeFunc
)
tSerializeSMDropCgroupReq
,
&
dropReq
);
}
static
int32_t
translateAlterLocal
(
STranslateContext
*
pCxt
,
SAlterLocalStmt
*
pStmt
)
{
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
2e640c38
...
...
@@ -239,7 +239,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
msg
.
contLen
=
tlen
+
sizeof
(
SMsgHead
);
msg
.
pCont
=
buf
;
msg
.
msgType
=
TDMT_
VND_
STREAM_RECOVER_FINISH
;
msg
.
msgType
=
TDMT_STREAM_RECOVER_FINISH
;
tmsgSendReq
(
pEpSet
,
&
msg
);
...
...
@@ -292,13 +292,19 @@ FAIL:
int32_t
streamSearchAndAddBlock
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReqs
,
SSDataBlock
*
pDataBlock
,
int32_t
vgSz
,
int64_t
groupId
)
{
char
*
ctbName
;
char
*
ctbName
=
taosMemoryCalloc
(
1
,
TSDB_TABLE_FNAME_LEN
);
if
(
ctbName
==
NULL
)
{
return
-
1
;
}
if
(
pDataBlock
->
info
.
parTbName
[
0
])
{
ctbName
=
taosMemoryCalloc
(
1
,
TSDB_TABLE_NAME_LEN
);
snprintf
(
ctbName
,
TSDB_TABLE_NAME_LEN
,
"%s.%s"
,
pTask
->
shuffleDispatcher
.
dbInfo
.
db
,
pDataBlock
->
info
.
parTbName
);
}
else
{
ctbName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
groupId
);
char
*
ctbShortName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
groupId
);
snprintf
(
ctbName
,
TSDB_TABLE_NAME_LEN
,
"%s.%s"
,
pTask
->
shuffleDispatcher
.
dbInfo
.
db
,
ctbShortName
);
taosMemoryFree
(
ctbShortName
);
}
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
2e640c38
...
...
@@ -36,6 +36,7 @@ int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
}
int32_t
streamBuildSourceRecover1Req
(
SStreamTask
*
pTask
,
SStreamRecoverStep1Req
*
pReq
)
{
pReq
->
msgHead
.
vgId
=
pTask
->
nodeId
;
pReq
->
streamId
=
pTask
->
streamId
;
pReq
->
taskId
=
pTask
->
taskId
;
return
0
;
...
...
@@ -48,6 +49,7 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
}
int32_t
streamBuildSourceRecover2Req
(
SStreamTask
*
pTask
,
SStreamRecoverStep2Req
*
pReq
)
{
pReq
->
msgHead
.
vgId
=
pTask
->
nodeId
;
pReq
->
streamId
=
pTask
->
streamId
;
pReq
->
taskId
=
pTask
->
taskId
;
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录