Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
964db517
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
964db517
编写于
4月 14, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'enh/rocksRevert' of
https://github.com/taosdata/TDengine
into enh/rocksRevert
上级
80f348de
d4207349
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
50 addition
and
139 deletion
+50
-139
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+0
-5
source/dnode/mgmt/mgmt_snode/src/smInt.c
source/dnode/mgmt/mgmt_snode/src/smInt.c
+1
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+1
-0
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+3
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+18
-30
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-9
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+2
-29
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-6
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+1
-1
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+2
-43
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+17
-9
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+0
-5
未找到文件。
include/libs/executor/executor.h
浏览文件 @
964db517
...
...
@@ -123,7 +123,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
* @param isAdd
* @return
*/
int32_t
qUpdateTableListForStreamScanner
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
,
SArray
*
pList
);
int32_t
qUpdateTableListForStreamScanner
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
);
/**
* Create the exec task object according to task json
...
...
include/libs/stream/tstream.h
浏览文件 @
964db517
...
...
@@ -31,7 +31,6 @@ extern "C" {
#ifndef _STREAM_H_
#define _STREAM_H_
typedef
void
(
*
_free_reader_fn_t
)(
void
*
);
typedef
struct
SStreamTask
SStreamTask
;
enum
{
...
...
@@ -221,7 +220,6 @@ SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
typedef
struct
{
char
*
qmsg
;
void
*
pExecutor
;
// not applicable to encoder and decoder
struct
STqReader
*
pTqReader
;
// not applicable to encoder and decoder
struct
SWalReader
*
pWalReader
;
// not applicable to encoder and decoder
}
STaskExec
;
...
...
@@ -333,7 +331,6 @@ struct SStreamTask {
int64_t
checkpointingId
;
int32_t
checkpointAlignCnt
;
struct
SStreamMeta
*
pMeta
;
_free_reader_fn_t
freeFp
;
};
// meta
...
...
@@ -343,7 +340,6 @@ typedef struct SStreamMeta {
TTB
*
pTaskDb
;
TTB
*
pCheckpointDb
;
SHashObj
*
pTasks
;
SHashObj
*
pWalReadTasks
;
void
*
ahandle
;
TXN
*
txn
;
FTaskExpand
*
expandFunc
;
...
...
@@ -577,7 +573,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamT
int32_t
streamMetaAddSerializedTask
(
SStreamMeta
*
pMeta
,
int64_t
checkpointVer
,
char
*
msg
,
int32_t
msgLen
);
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
);
SStreamTask
*
streamMetaAcquireTaskEx
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
void
streamMetaReleaseTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
void
streamMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
...
...
source/dnode/mgmt/mgmt_snode/src/smInt.c
浏览文件 @
964db517
...
...
@@ -55,6 +55,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
smClose
(
pMgmt
);
return
-
1
;
}
tmsgReportStartup
(
"snode-impl"
,
"initialized"
);
if
(
smStartWorker
(
pMgmt
)
!=
0
)
{
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
964db517
...
...
@@ -356,6 +356,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
pInnerTask
->
fillHistory
=
pStream
->
fillHistory
;
mndAddTaskToTaskSet
(
taskInnerLevel
,
pInnerTask
);
...
...
source/dnode/snode/src/snode.c
浏览文件 @
964db517
...
...
@@ -32,6 +32,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear
(
&
decoder
);
goto
FAIL
;
}
tDecoderClear
(
&
decoder
);
int32_t
taskId
=
req
.
taskId
;
...
...
@@ -78,6 +79,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
pMsgCb
=
&
pSnode
->
msgCb
;
pTask
->
chkInfo
.
version
=
ver
;
pTask
->
pMeta
=
pSnode
->
pMeta
;
pTask
->
pState
=
streamStateOpen
(
pSnode
->
path
,
pTask
,
false
,
-
1
,
-
1
);
if
(
pTask
->
pState
==
NULL
)
{
...
...
@@ -138,6 +140,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
msgLen
);
code
=
tDecodeStreamTask
(
&
decoder
,
pTask
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
964db517
...
...
@@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pRSmaInfo
->
taskInfo
[
i
])
{
if
((
terrno
=
qUpdateTableListForStreamScanner
(
pRSmaInfo
->
taskInfo
[
i
],
tbUids
,
isAdd
,
NULL
))
<
0
)
{
if
((
terrno
=
qUpdateTableListForStreamScanner
(
pRSmaInfo
->
taskInfo
[
i
],
tbUids
,
isAdd
))
<
0
)
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
*
suid
,
i
,
terrstr
());
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
964db517
...
...
@@ -567,6 +567,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
;
pTask
->
pMeta
=
pTq
->
pStreamMeta
;
pTask
->
chkInfo
.
version
=
ver
;
// expand executor
if
(
pTask
->
fillHistory
)
{
...
...
@@ -628,18 +629,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
pTask
->
exec
.
pTqReader
=
tqOpenReader
(
pTq
->
pVnode
);
if
(
pTask
->
exec
.
pTqReader
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pTask
->
exec
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
pTask
->
freeFp
=
(
_free_reader_fn_t
)
tqCloseReader
;
SArray
*
pList
=
qGetQueriedTableListInfo
(
pTask
->
exec
.
pExecutor
);
tqReaderAddTbUidList
(
pTask
->
exec
.
pTqReader
,
pList
);
taosArrayDestroy
(
pList
);
}
streamSetupTrigger
(
pTask
);
...
...
@@ -669,17 +659,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
};
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
taskId
);
if
(
pTask
&&
atomic_load_8
(
&
pTask
->
status
.
taskStatus
)
==
TASK_STATUS__NORMAL
)
{
rsp
.
status
=
1
;
if
(
pTask
)
{
rsp
.
status
=
(
atomic_load_8
(
&
pTask
->
status
.
taskStatus
)
==
TASK_STATUS__NORMAL
)
?
1
:
0
;
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqDebug
(
"tq recv task check req(reqId:0x%"
PRIx64
") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d"
,
rsp
.
reqId
,
rsp
.
downstreamTaskId
,
rsp
.
downstreamNodeId
,
pTask
->
status
.
taskStatus
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
tqDebug
(
"tq recv task check(taskId:%d not built yet) req(reqId:0x%"
PRIx64
") %d at node %d, check req from task %d at node %d, rsp status %d"
,
taskId
,
rsp
.
reqId
,
rsp
.
downstreamTaskId
,
rsp
.
downstreamNodeId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
if
(
pTask
)
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqDebug
(
"tq recv task check req(reqId:0x%"
PRIx64
") %d at node %d check req from task %d at node %d, status %d"
,
rsp
.
reqId
,
rsp
.
downstreamTaskId
,
rsp
.
downstreamNodeId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
SEncoder
encoder
;
int32_t
code
;
int32_t
len
;
...
...
@@ -697,13 +692,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
tEncodeSStreamTaskCheckRsp
(
&
encoder
,
&
rsp
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rspMsg
=
{
.
code
=
0
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
,
.
info
=
pMsg
->
info
,
};
SRpcMsg
rspMsg
=
{
.
code
=
0
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rspMsg
);
return
0
;
}
...
...
@@ -719,8 +708,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
tDecoderClear
(
&
decoder
);
return
-
1
;
}
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
tqDebug
(
"tq recv task check rsp(reqId:0x%"
PRIx64
") %d at node %d check req from task %d at node %d, status %d"
,
rsp
.
reqId
,
rsp
.
downstreamTaskId
,
rsp
.
downstreamNodeId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
...
...
@@ -774,8 +763,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskCheckDownstream
(
pTask
,
sversion
);
}
tqDebug
(
"vgId:%d s-task:%s is deployed
from mnd, status:%d, total:%d"
,
TD_VID
(
pTq
->
pVnode
),
pTask
->
id
.
idStr
,
pTask
->
status
.
taskStatus
,
streamMetaGetNumOfTasks
(
pTq
->
pStreamMeta
));
tqDebug
(
"vgId:%d s-task:%s is deployed
and add meta from mnd, status:%d, total:%d"
,
TD_VID
(
pTq
->
pVnode
)
,
pTask
->
id
.
idStr
,
pTask
->
status
.
taskStatus
,
streamMetaGetNumOfTasks
(
pTq
->
pStreamMeta
));
return
0
;
}
...
...
@@ -1127,7 +1116,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
return
0
;
}
SStreamTask
*
pTask
=
streamMetaAcquireTask
Ex
(
pTq
->
pStreamMeta
,
taskId
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
taskId
);
if
(
pTask
!=
NULL
)
{
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__NORMAL
)
{
tqDebug
(
"vgId:%d s-task:%s start to process run req"
,
vgId
,
pTask
->
id
.
idStr
);
...
...
@@ -1141,7 +1130,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqStartStreamTasks
(
pTq
);
return
0
;
}
else
{
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
964db517
...
...
@@ -973,7 +973,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
STqHandle
*
pTqHandle
=
(
STqHandle
*
)
pIter
;
if
(
pTqHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
int32_t
code
=
qUpdateTableListForStreamScanner
(
pTqHandle
->
execHandle
.
task
,
tbUidList
,
isAdd
,
NULL
);
int32_t
code
=
qUpdateTableListForStreamScanner
(
pTqHandle
->
execHandle
.
task
,
tbUidList
,
isAdd
);
if
(
code
!=
0
)
{
tqError
(
"update qualified table error for %s"
,
pTqHandle
->
subKey
);
continue
;
...
...
@@ -1031,18 +1031,11 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
pIter
;
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
SArray
*
pList
=
NULL
;
int32_t
code
=
qUpdateTableListForStreamScanner
(
pTask
->
exec
.
pExecutor
,
tbUidList
,
isAdd
,
pList
);
int32_t
code
=
qUpdateTableListForStreamScanner
(
pTask
->
exec
.
pExecutor
,
tbUidList
,
isAdd
);
if
(
code
!=
0
)
{
tqError
(
"vgId:%d, s-task:%s update qualified table error for stream task"
,
vgId
,
pTask
->
id
.
idStr
);
continue
;
}
if
(
isAdd
)
{
// only add qualified tables
tqReaderAddTbUidList
(
pTask
->
exec
.
pTqReader
,
pList
);
}
else
{
tqReaderRemoveTbUidList
(
pTask
->
exec
.
pTqReader
,
tbUidList
);
}
}
}
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
964db517
...
...
@@ -49,32 +49,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
return
TSDB_CODE_SUCCESS
;
}
int32_t
launchTaskForWalBlock
(
SStreamTask
*
pTask
,
SFetchRet
*
pRet
,
STqOffset
*
pOffset
)
{
SStreamDataBlock
*
pBlocks
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
,
0
);
if
(
pBlocks
==
NULL
)
{
// failed, do nothing
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pRet
->
data
.
info
.
type
=
STREAM_NORMAL
;
pBlocks
->
type
=
STREAM_INPUT__DATA_BLOCK
;
pBlocks
->
sourceVer
=
pOffset
->
val
.
version
;
pBlocks
->
blocks
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
taosArrayPush
(
pBlocks
->
blocks
,
&
pRet
->
data
);
// int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData);
// tqDebug("-----------%ld\n", ts[0]);
int32_t
code
=
tqAddInputBlockNLaunchTask
(
pTask
,
(
SStreamQueueItem
*
)
pBlocks
,
pBlocks
->
sourceVer
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pOffset
->
val
.
version
=
walReaderGetCurrentVer
(
pTask
->
exec
.
pTqReader
->
pWalReader
);
tqDebug
(
"s-task:%s set the ver:%"
PRId64
" from WALReader after extract block from WAL"
,
pTask
->
id
.
idStr
,
pOffset
->
val
.
version
);
}
return
0
;
}
void
initOffsetForAllRestoreTasks
(
STQ
*
pTq
)
{
void
*
pIter
=
NULL
;
...
...
@@ -90,8 +64,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
}
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__RECOVER_PREPARE
||
pTask
->
status
.
taskStatus
==
TASK_STATUS__WAIT_DOWNSTREAM
)
{
tqDebug
(
"stream task:%d skip push data, not ready for processing, status %d"
,
pTask
->
id
.
taskId
,
pTask
->
status
.
taskStatus
);
tqDebug
(
"s-task:%s skip push data, since not ready, status %d"
,
pTask
->
id
.
idStr
,
pTask
->
status
.
taskStatus
);
continue
;
}
...
...
@@ -120,7 +93,7 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
}
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__RECOVER_PREPARE
||
pTask
->
status
.
taskStatus
==
TASK_STATUS__WAIT_DOWNSTREAM
)
{
tqDebug
(
"s
tream task:%d skip push data, not ready for processing, status %d"
,
pTask
->
id
.
taskId
,
tqDebug
(
"s
-task:%s skip push data, not ready for processing, status %d"
,
pTask
->
id
.
idStr
,
pTask
->
status
.
taskStatus
);
continue
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
964db517
...
...
@@ -370,7 +370,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
return
qa
;
}
int32_t
qUpdateTableListForStreamScanner
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
,
SArray
*
pList
)
{
int32_t
qUpdateTableListForStreamScanner
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
int32_t
code
=
0
;
...
...
@@ -386,11 +386,6 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterUnqualifiedTables
(
pScanInfo
,
tableIdList
,
GET_TASKID
(
pTaskInfo
));
int32_t
numOfQualifiedTables
=
taosArrayGetSize
(
qa
);
if
(
pList
!=
NULL
)
{
taosArrayAddAll
(
pList
,
qa
);
}
qDebug
(
"%d qualified child tables added into stream scanner, %s"
,
numOfQualifiedTables
,
id
);
code
=
tqReaderAddTbUidList
(
pScanInfo
->
tqReader
,
qa
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/stream/src/stream.c
浏览文件 @
964db517
...
...
@@ -16,7 +16,7 @@
#include "streamInc.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2
000
int32_t
streamInit
()
{
int8_t
old
;
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
964db517
...
...
@@ -57,11 +57,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto
_err
;
}
pMeta
->
pWalReadTasks
=
taosHashInit
(
64
,
fp
,
true
,
HASH_ENTRY_LOCK
);
if
(
pMeta
->
pWalReadTasks
==
NULL
)
{
goto
_err
;
}
if
(
streamMetaBegin
(
pMeta
)
<
0
)
{
goto
_err
;
}
...
...
@@ -75,7 +70,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
_err:
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pWalReadTasks
)
taosHashCleanup
(
pMeta
->
pWalReadTasks
);
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
pCheckpointDb
)
tdbTbClose
(
pMeta
->
pCheckpointDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
...
...
@@ -112,7 +106,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup
(
pMeta
->
pTasks
);
taosHashCleanup
(
pMeta
->
pWalReadTasks
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
);
}
...
...
@@ -190,15 +183,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return
-
1
;
}
pTask
->
status
.
taskStatus
=
STREAM_STATUS__NORMAL
;
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
int32_t
),
&
pTask
,
POINTER_BYTES
);
return
0
;
}
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
)
{
int32_t
numOfReady
=
taosHashGetSize
(
pMeta
->
pTasks
);
int32_t
numOfRestoring
=
taosHashGetSize
(
pMeta
->
pWalReadTasks
);
return
numOfReady
+
numOfRestoring
;
return
(
int32_t
)
taosHashGetSize
(
pMeta
->
pTasks
);
}
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
...
...
@@ -224,37 +214,6 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
}
SStreamTask
*
streamMetaAcquireTaskEx
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
taosRLockLatch
(
&
pMeta
->
lock
);
SStreamTask
*
pTask
=
NULL
;
int32_t
numOfRestored
=
taosHashGetSize
(
pMeta
->
pWalReadTasks
);
if
(
numOfRestored
>
0
)
{
SStreamTask
**
p
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pWalReadTasks
,
&
taskId
,
sizeof
(
taskId
));
if
(
p
!=
NULL
)
{
pTask
=
*
p
;
if
(
pTask
!=
NULL
&&
(
atomic_load_8
(
&
(
pTask
->
status
.
taskStatus
))
!=
TASK_STATUS__DROPPING
))
{
atomic_add_fetch_32
(
&
pTask
->
refCnt
,
1
);
taosRUnLockLatch
(
&
pMeta
->
lock
);
return
pTask
;
}
}
}
SStreamTask
**
p
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
if
(
p
!=
NULL
)
{
pTask
=
*
p
;
if
(
pTask
!=
NULL
&&
atomic_load_8
(
&
(
pTask
->
status
.
taskStatus
))
!=
TASK_STATUS__DROPPING
)
{
atomic_add_fetch_32
(
&
pTask
->
refCnt
,
1
);
taosRUnLockLatch
(
&
pMeta
->
lock
);
return
pTask
;
}
}
taosRUnLockLatch
(
&
pMeta
->
lock
);
return
NULL
;
}
void
streamMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
if
(
ppTask
)
{
...
...
@@ -344,7 +303,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return
-
1
;
}
if
(
taosHashPut
(
pMeta
->
p
WalRead
Tasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
))
<
0
)
{
if
(
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
))
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
964db517
...
...
@@ -17,6 +17,7 @@
int32_t
streamTaskLaunchRecover
(
SStreamTask
*
pTask
,
int64_t
version
)
{
qDebug
(
"s-task:%s at node %d launch recover"
,
pTask
->
id
.
idStr
,
pTask
->
nodeId
);
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__RECOVER_PREPARE
);
streamSetParamForRecover
(
pTask
);
...
...
@@ -33,12 +34,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
memcpy
(
serializedReq
,
&
req
,
len
);
SRpcMsg
rpcMsg
=
{
.
contLen
=
len
,
.
pCont
=
serializedReq
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE
,
};
SRpcMsg
rpcMsg
=
{
.
contLen
=
len
,
.
pCont
=
serializedReq
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE
};
if
(
tmsgPutToQueue
(
pTask
->
pMsgCb
,
STREAM_QUEUE
,
&
rpcMsg
)
<
0
)
{
/*ASSERT(0);*/
}
...
...
@@ -50,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
)
{
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__NORMAL
);
}
return
0
;
}
...
...
@@ -61,6 +58,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
.
upstreamNodeId
=
pTask
->
nodeId
,
.
childId
=
pTask
->
selfChildId
,
};
// serialize
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
)
{
req
.
reqId
=
tGenIdPI64
();
...
...
@@ -128,6 +126,7 @@ int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
int32_t
streamProcessTaskCheckRsp
(
SStreamTask
*
pTask
,
const
SStreamTaskCheckRsp
*
pRsp
,
int64_t
version
)
{
qDebug
(
"task %d at node %d recv check rsp from task %d at node %d: status %d"
,
pRsp
->
upstreamTaskId
,
pRsp
->
upstreamNodeId
,
pRsp
->
downstreamTaskId
,
pRsp
->
downstreamNodeId
,
pRsp
->
status
);
if
(
pRsp
->
status
==
1
)
{
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
bool
found
=
false
;
...
...
@@ -138,7 +137,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
break
;
}
}
if
(
!
found
)
return
-
1
;
if
(
!
found
)
{
return
-
1
;
}
int32_t
left
=
atomic_sub_fetch_32
(
&
pTask
->
recoverTryingDownstream
,
1
);
ASSERT
(
left
>=
0
);
if
(
left
==
0
)
{
...
...
@@ -147,7 +150,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
streamTaskLaunchRecover
(
pTask
,
version
);
}
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
)
{
if
(
pRsp
->
reqId
!=
pTask
->
checkReqId
)
return
-
1
;
if
(
pRsp
->
reqId
!=
pTask
->
checkReqId
)
{
return
-
1
;
}
streamTaskLaunchRecover
(
pTask
,
version
);
}
else
{
ASSERT
(
0
);
...
...
@@ -167,6 +173,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
void
*
exec
=
pTask
->
exec
.
pExecutor
;
return
qStreamRestoreParam
(
exec
);
}
int32_t
streamSetStatusNormal
(
SStreamTask
*
pTask
)
{
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__NORMAL
);
return
0
;
...
...
@@ -227,8 +234,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// agg
int32_t
streamAggRecoverPrepare
(
SStreamTask
*
pTask
)
{
void
*
exec
=
pTask
->
exec
.
pExecutor
;
pTask
->
recoverWaitingUpstream
=
taosArrayGetSize
(
pTask
->
childEpInfo
);
qDebug
(
"s-task:%s wait for %d upstreams"
,
pTask
->
id
.
idStr
,
pTask
->
recoverWaitingUpstream
);
return
0
;
}
...
...
@@ -247,6 +254,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
int32_t
streamProcessRecoverFinishReq
(
SStreamTask
*
pTask
,
int32_t
childId
)
{
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
int32_t
left
=
atomic_sub_fetch_32
(
&
pTask
->
recoverWaitingUpstream
,
1
);
qDebug
(
"s-task:%s remain unfinished child tasks:%d"
,
pTask
->
id
.
idStr
,
left
);
ASSERT
(
left
>=
0
);
if
(
left
==
0
)
{
streamAggChildrenRecoverFinish
(
pTask
);
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
964db517
...
...
@@ -187,11 +187,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask
->
exec
.
pExecutor
=
NULL
;
}
if
(
pTask
->
exec
.
pTqReader
!=
NULL
&&
pTask
->
freeFp
!=
NULL
)
{
pTask
->
freeFp
(
pTask
->
exec
.
pTqReader
);
pTask
->
exec
.
pTqReader
=
NULL
;
}
if
(
pTask
->
exec
.
pWalReader
!=
NULL
)
{
walCloseReader
(
pTask
->
exec
.
pWalReader
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录