Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5ca27cd4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5ca27cd4
编写于
6月 01, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13378 from taosdata/feature/tq
refactor(tmq): tq sink and push
上级
4e987a50
e1a5000e
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
340 addition
and
265 deletion
+340
-265
include/common/tmsg.h
include/common/tmsg.h
+1
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+4
-9
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-3
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+3
-2
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+2
-2
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+22
-34
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+4
-2
source/dnode/vnode/src/sma/sma.c
source/dnode/vnode/src/sma/sma.c
+1
-2
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-207
source/dnode/vnode/src/tq/tqCommit.c
source/dnode/vnode/src/tq/tqCommit.c
+5
-0
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+26
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+180
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+35
-0
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+34
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+10
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
5ca27cd4
...
...
@@ -244,7 +244,7 @@ typedef struct {
const
void
*
pMsg
;
}
SSubmitMsgIter
;
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tInitSubmitBlkIter
(
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
...
...
include/libs/stream/tstream.h
浏览文件 @
5ca27cd4
...
...
@@ -61,11 +61,8 @@ enum {
};
typedef
struct
{
int8_t
type
;
int32_t
sourceVg
;
int64_t
sourceVer
;
int8_t
type
;
int64_t
ver
;
int32_t
*
dataRef
;
SSubmitReq
*
data
;
}
SStreamDataSubmit
;
...
...
@@ -111,6 +108,8 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
}
}
SStreamDataSubmit
*
streamSubmitRefClone
(
SStreamDataSubmit
*
pSubmit
);
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
);
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
);
...
...
@@ -209,8 +208,6 @@ struct SStreamTask {
int32_t
nodeId
;
SEpSet
epSet
;
// source preprocess
// exec
STaskExec
exec
;
...
...
@@ -318,8 +315,6 @@ int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
);
...
...
source/common/src/tmsg.c
浏览文件 @
5ca27cd4
...
...
@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
-
1
;
...
...
@@ -165,7 +165,6 @@ int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) {
return
0
;
}
int32_t
taosEncodeSEpSet
(
void
**
buf
,
const
SEpSet
*
pEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pEp
->
inUse
);
...
...
@@ -3053,7 +3052,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t
tFreeSCreateVnodeReq
(
SCreateVnodeReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pRetensions
);
pReq
->
pRetensions
=
NULL
;
if
(
pReq
->
isTsma
)
{
if
(
pReq
->
isTsma
)
{
taosMemoryFreeClear
(
pReq
->
pTsma
);
}
return
0
;
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
5ca27cd4
...
...
@@ -53,8 +53,9 @@ target_sources(
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
#"src/tq/tqPush.c"
#"src/tq/tqCommit.c"
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
)
target_include_directories
(
vnode
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
5ca27cd4
...
...
@@ -219,7 +219,7 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
);
int32_t
tdProcessTSmaCreateImpl
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
pMsg
);
int32_t
tdUpdateExpiredWindowImpl
(
SSma
*
pSma
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tdUpdateExpiredWindowImpl
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tdGetTSmaDataImpl
(
SSma
*
pSma
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
...
...
@@ -227,4 +227,4 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
}
#endif
#endif
/*_TD_VNODE_SMA_H_*/
\ No newline at end of file
#endif
/*_TD_VNODE_SMA_H_*/
source/dnode/vnode/src/inc/tq.h
浏览文件 @
5ca27cd4
...
...
@@ -66,33 +66,27 @@ struct STqReadHandle {
// tqPush
typedef
struct
{
int64_t
consumerId
;
int32_t
epoch
;
int32_t
skipLogNum
;
int64_t
reqOffset
;
SRpcHandleInfo
info
;
SRWLatch
lock
;
}
STqPushHandle
;
STaosQueue
*
queue
;
STaosQall
*
qall
;
void
*
qItem
;
}
STqInputQ
;
#if 0
typedef
struct
{
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
// msg info
int64_t
consumerId
;
int64_t
reqOffset
;
int64_t
processedVer
;
int32_t
epoch
;
int8_t subType;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char* qmsg;
SHashObj* pDropTbUid;
STqPushHandle pushHandle;
// SRWLatch lock;
SWalReadHandle* pWalReader;
// task number should be the same with fetch thread
STqReadHandle* pExecReader[5];
qTaskInfo_t task[5];
} STqExec;
#endif
int32_t
skipLogNum
;
// rpc info
int64_t
reqId
;
SRpcHandleInfo
rpcInfo
;
// exec
int8_t
inputStatus
;
int8_t
execStatus
;
STqInputQ
inputQ
;
SRWLatch
lock
;
}
STqPushHandle
;
// tqExec
...
...
@@ -154,27 +148,21 @@ typedef struct {
static
STqMgmt
tqMgmt
=
{
0
};
// init once
int
tqInit
();
void
tqCleanUp
();
// int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
// int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
int32_t
tEncodeSTqHandle
(
SEncoder
*
pEncoder
,
const
STqHandle
*
pHandle
);
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
);
// tqRead
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalHead
**
pHeadWithCkSum
);
// tqExec
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
);
// tqMeta
int32_t
tqMetaOpen
(
STQ
*
pTq
);
int32_t
tqMetaClose
(
STQ
*
pTq
);
int32_t
tqMetaSaveHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
);
int32_t
tqMetaDeleteHandle
(
STQ
*
pTq
,
const
char
*
key
);
// tqSink
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
// tqOffset
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
5ca27cd4
...
...
@@ -125,6 +125,8 @@ int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t
tsdbSnapshotRead
(
STsdbSnapshotReader
*
pReader
,
void
**
ppData
,
uint32_t
*
nData
);
// tq
int
tqInit
();
void
tqCleanUp
();
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
void
tqClose
(
STQ
*
);
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
);
...
...
@@ -145,11 +147,11 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessRSmaCreate
(
SVnode
*
pVnode
,
SVCreateStbReq
*
pReq
);
int32_t
tdProcessRSmaCreate
(
SVnode
*
pVnode
,
SVCreateStbReq
*
pReq
);
int32_t
tdProcessRSmaSubmit
(
SSma
*
pSma
,
void
*
pMsg
,
int32_t
inputType
);
int32_t
tdFetchTbUidList
(
SSma
*
pSma
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tdUpdateTbUidList
(
SSma
*
pSma
,
STbUidStore
*
pUidStore
);
...
...
source/dnode/vnode/src/sma/sma.c
浏览文件 @
5ca27cd4
...
...
@@ -15,7 +15,6 @@
#include "sma.h"
// TODO: Who is responsible for resource allocate and release?
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -37,7 +36,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
return
code
;
}
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tdUpdateExpiredWindowImpl
(
pSma
,
pMsg
,
version
))
<
0
)
{
smaWarn
(
"vgId:%d update expired sma window failed since %s"
,
SMA_VID
(
pSma
),
tstrerror
(
terrno
));
...
...
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
5ca27cd4
...
...
@@ -932,7 +932,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
* @param msg SSubmitReq
* @return int32_t
*/
int32_t
tdUpdateExpiredWindowImpl
(
SSma
*
pSma
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
tdUpdateExpiredWindowImpl
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
)
{
// no time-range-sma, just return success
if
(
atomic_load_16
(
&
SMA_TSMA_NUM
(
pSma
))
<=
0
)
{
smaTrace
(
"vgId:%d not update expire window since no tSma"
,
SMA_VID
(
pSma
));
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
5ca27cd4
...
...
@@ -81,169 +81,10 @@ void tqClose(STQ* pTq) {
// TODO
}
int32_t
tEncodeSTqHandle
(
SEncoder
*
pEncoder
,
const
STqHandle
*
pHandle
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
execHandle
.
exec
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pHandle
->
execHandle
.
exec
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
handles
,
pIter
);
if
(
pIter
==
NULL
)
break
;
STqHandle
*
pExec
=
(
STqHandle
*
)
pIter
;
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
execHandle
.
exec
.
execCol
.
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
else
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
if
(
!
isAdd
)
{
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pExec
->
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
}
}
}
else
{
// tq update id
}
}
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pTask
->
exec
.
executor
,
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
return
0
;
}
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
,
SRpcHandleInfo
handleInfo
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
STqHandle
*
pHandle
=
NULL
;
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
msg
;
int32_t
workerId
=
4
;
int64_t
fetchOffset
=
ver
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pushMgr
,
pIter
);
if
(
pIter
==
NULL
)
break
;
pHandle
=
*
(
STqHandle
**
)
pIter
;
taosWLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pHandle
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
tqDataExec
(
pTq
,
&
pHandle
->
execHandle
,
pReq
,
&
rsp
,
workerId
);
}
else
{
// TODO
ASSERT
(
0
);
}
if
(
rsp
.
blockNum
==
0
)
{
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
continue
;
}
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
rsp
.
rspOffset
=
fetchOffset
;
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqDataBlkRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
// todo free
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pHandle
->
pushHandle
.
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
pHandle
->
pushHandle
.
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
SRpcMsg
resp
=
{
.
info
=
pHandle
->
pushHandle
.
info
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
};
tmsgSendRsp
(
&
resp
);
memset
(
&
pHandle
->
pushHandle
.
info
,
0
,
sizeof
(
SRpcHandleInfo
));
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
tqDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHandle
->
pushHandle
.
consumerId
,
pHandle
->
pushHandle
.
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
taosArrayDestroy
(
rsp
.
blockDataLen
);
}
return
0
;
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO handle sma error
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
tqProcessStreamTrigger
(
pTq
,
data
);
}
return
0
;
}
int
tqCommit
(
STQ
*
pTq
)
{
// do nothing
return
0
;
}
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReq
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
waitTime
=
pReq
->
timeout
;
int64_t
timeout
=
pReq
->
timeout
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
...
...
@@ -286,20 +127,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
rsp
.
withSchema
=
false
;
rsp
.
withTag
=
false
;
}
else
{
rsp
.
withSchema
=
true
;
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
withTag
=
false
;
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
}
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
tq
Debug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d
discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerEpoch
,
reqEpoch
);
tq
Warn
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d,
discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerEpoch
,
reqEpoch
);
break
;
}
...
...
@@ -310,27 +149,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SWalReadHead
*
pHead
=
&
pHeadWithCkSum
->
head
;
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
pExec->pushHandle.consumerId = consumerId;
pExec->pushHandle.epoch = reqEpoch;
pExec->pushHandle.reqOffset = rsp.reqOffset;
pExec->pushHandle.skipLogNum = rsp.skipLogNum;
pExec->pushHandle.handle = pMsg;
taosWUnLockLatch(&pExec->pushHandle.lock);
// TODO add timer
// TODO: the pointer will always be valid?
taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
return 0;
#endif
tqDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
...
...
@@ -471,24 +289,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
return
0
;
}
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pRes
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
SSubmitReq
*
pReq
=
tdBlockToSubmit
(
pRes
,
pTask
->
tbSink
.
pTSchema
,
true
,
pTask
->
tbSink
.
stbUid
,
pTask
->
tbSink
.
stbFullName
,
pVnode
->
config
.
vgId
);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
pReq
,
.
contLen
=
ntohl
(
pReq
->
length
),
};
ASSERT
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
==
0
);
}
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
...
...
@@ -579,9 +379,11 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
continue
;
}
streamDataSubmitRefInc
(
pSubmit
);
SStreamDataSubmit
*
pSubmitClone
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
memcpy
(
pSubmitClone
,
pSubmit
,
sizeof
(
SStreamDataSubmit
));
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
(
pSubmit
);
if
(
pSubmitClone
==
NULL
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
continue
;
}
taosWriteQitem
(
pTask
->
inputQ
,
pSubmitClone
);
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
...
...
source/dnode/vnode/src/tq/tqCommit.c
浏览文件 @
5ca27cd4
...
...
@@ -14,3 +14,8 @@
*/
#include "tq.h"
int
tqCommit
(
STQ
*
pTq
)
{
// do nothing
return
0
;
}
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
5ca27cd4
...
...
@@ -15,6 +15,32 @@
#include "tdbInt.h"
#include "tq.h"
static
int32_t
tEncodeSTqHandle
(
SEncoder
*
pEncoder
,
const
STqHandle
*
pHandle
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
execHandle
.
exec
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
static
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pHandle
->
execHandle
.
exec
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndDecode
(
pDecoder
);
return
0
;
}
int
tqExecKeyCompare
(
const
void
*
pKey1
,
int32_t
kLen1
,
const
void
*
pKey2
,
int32_t
kLen2
)
{
return
strcmp
(
pKey1
,
pKey2
);
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
5ca27cd4
...
...
@@ -14,3 +14,183 @@
*/
#include "tq.h"
int32_t
tqExecFromInputQ
(
STQ
*
pTq
,
STqHandle
*
pHandle
)
{
// 1. guard and set status executing
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
//
// 3. exec, after each success, update processed ver
// first run
// set exec status closing
// second run
// set exec status idle
//
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
// 4.2 rpc send
// 4.3 clear rpc info
return
0
;
}
int32_t
tqOpenPushHandle
(
STQ
*
pTq
,
STqHandle
*
pHandle
)
{
memset
(
&
pHandle
->
pushHandle
,
0
,
sizeof
(
STqPushHandle
));
pHandle
->
pushHandle
.
inputQ
.
queue
=
taosOpenQueue
();
pHandle
->
pushHandle
.
inputQ
.
qall
=
taosAllocateQall
();
if
(
pHandle
->
pushHandle
.
inputQ
.
queue
==
NULL
||
pHandle
->
pushHandle
.
inputQ
.
qall
==
NULL
)
{
if
(
pHandle
->
pushHandle
.
inputQ
.
queue
)
{
taosCloseQueue
(
pHandle
->
pushHandle
.
inputQ
.
queue
);
}
if
(
pHandle
->
pushHandle
.
inputQ
.
qall
)
{
taosFreeQall
(
pHandle
->
pushHandle
.
inputQ
.
qall
);
}
return
-
1
;
}
return
0
;
}
void
tqPreparePush
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
reqId
,
const
SRpcHandleInfo
*
pInfo
,
int64_t
processedVer
)
{
memcpy
(
&
pHandle
->
pushHandle
.
rpcInfo
,
pInfo
,
sizeof
(
SRpcHandleInfo
));
atomic_store_64
(
&
pHandle
->
pushHandle
.
reqId
,
reqId
);
atomic_store_64
(
&
pHandle
->
pushHandle
.
processedVer
,
processedVer
);
atomic_store_8
(
&
pHandle
->
pushHandle
.
inputStatus
,
TASK_INPUT_STATUS__NORMAL
);
// set timeout timer
}
int32_t
tqEnqueue
(
STqHandle
*
pHandle
,
SStreamDataSubmit
*
pSubmit
)
{
int8_t
inputStatus
=
atomic_load_8
(
&
pHandle
->
pushHandle
.
inputStatus
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
(
pSubmit
);
if
(
pSubmitClone
==
NULL
)
{
return
-
1
;
}
taosWriteQitem
(
pHandle
->
pushHandle
.
inputQ
.
queue
,
pSubmitClone
);
return
0
;
}
return
-
1
;
}
int32_t
tqSendExecReq
(
STQ
*
pTq
,
STqHandle
*
pHandle
)
{
//
return
0
;
}
int32_t
tqEnqueueAll
(
STQ
*
pTq
,
SSubmitReq
*
pReq
)
{
void
*
pIter
=
NULL
;
SStreamDataSubmit
*
pSubmit
=
streamDataSubmitNew
(
pReq
);
if
(
pSubmit
==
NULL
)
{
return
-
1
;
}
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
handles
,
pIter
);
if
(
pIter
==
NULL
)
break
;
STqHandle
*
pHandle
=
(
STqHandle
*
)
pIter
;
if
(
tqEnqueue
(
pHandle
,
pSubmit
)
<
0
)
{
continue
;
}
int8_t
execStatus
=
atomic_load_8
(
&
pHandle
->
pushHandle
.
execStatus
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
tqSendExecReq
(
pTq
,
pHandle
);
}
}
streamDataSubmitRefDec
(
pSubmit
);
return
0
;
}
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
,
SRpcHandleInfo
handleInfo
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
STqHandle
*
pHandle
=
NULL
;
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
msg
;
int32_t
workerId
=
4
;
int64_t
fetchOffset
=
ver
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pushMgr
,
pIter
);
if
(
pIter
==
NULL
)
break
;
pHandle
=
*
(
STqHandle
**
)
pIter
;
taosWLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pHandle
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
tqDataExec
(
pTq
,
&
pHandle
->
execHandle
,
pReq
,
&
rsp
,
workerId
);
}
else
{
// TODO
ASSERT
(
0
);
}
if
(
rsp
.
blockNum
==
0
)
{
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
continue
;
}
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
rsp
.
rspOffset
=
fetchOffset
;
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqDataBlkRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
// todo free
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pHandle
->
pushHandle
.
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
pHandle
->
pushHandle
.
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
SRpcMsg
resp
=
{
.
info
=
pHandle
->
pushHandle
.
rpcInfo
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
};
tmsgSendRsp
(
&
resp
);
memset
(
&
pHandle
->
pushHandle
.
rpcInfo
,
0
,
sizeof
(
SRpcHandleInfo
));
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
tqDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHandle
->
pushHandle
.
consumerId
,
pHandle
->
pushHandle
.
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
taosArrayDestroy
(
rsp
.
blockDataLen
);
}
return
0
;
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO handle sma error
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
tqProcessStreamTrigger
(
pTq
,
data
);
}
return
0
;
}
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
5ca27cd4
...
...
@@ -298,3 +298,38 @@ int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList)
return
0
;
}
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
handles
,
pIter
);
if
(
pIter
==
NULL
)
break
;
STqHandle
*
pExec
=
(
STqHandle
*
)
pIter
;
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
execHandle
.
exec
.
execCol
.
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
else
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
if
(
!
isAdd
)
{
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pExec
->
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
}
}
}
else
{
// tq update id
}
}
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pTask
->
exec
.
executor
,
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
return
0
;
}
source/dnode/vnode/src/tq/tqSink.c
0 → 100644
浏览文件 @
5ca27cd4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pRes
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
SSubmitReq
*
pReq
=
tdBlockToSubmit
(
pRes
,
pTask
->
tbSink
.
pTSchema
,
true
,
pTask
->
tbSink
.
stbUid
,
pTask
->
tbSink
.
stbFullName
,
pVnode
->
config
.
vgId
);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
pReq
,
.
contLen
=
ntohl
(
pReq
->
length
),
};
ASSERT
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
==
0
);
}
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
5ca27cd4
...
...
@@ -189,4 +189,4 @@ void vnodeStop(SVnode *pVnode) {}
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
)
{
return
pVnode
->
sync
;
}
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
lastApplyIndex
=
pVnode
->
state
.
committed
;
}
\ No newline at end of file
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
lastApplyIndex
=
pVnode
->
state
.
committed
;
}
source/libs/stream/src/tstream.c
浏览文件 @
5ca27cd4
...
...
@@ -35,6 +35,16 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
return
(
void
*
)
buf
;
}
SStreamDataSubmit
*
streamSubmitRefClone
(
SStreamDataSubmit
*
pSubmit
)
{
SStreamDataSubmit
*
pSubmitClone
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmitClone
==
NULL
)
{
return
NULL
;
}
streamDataSubmitRefInc
(
pSubmit
);
memcpy
(
pSubmitClone
,
pSubmit
,
sizeof
(
SStreamDataSubmit
));
return
pSubmitClone
;
}
static
int32_t
streamBuildDispatchMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamDispatchReq
req
=
{
.
streamId
=
pTask
->
streamId
,
...
...
@@ -207,7 +217,6 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run, from qall, handle failure from last exec
pRes
=
streamExecForQall
(
pTask
,
pRes
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录