Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3c4631ef
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3c4631ef
编写于
7月 14, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add checkpoint
上级
b1643228
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
240 addition
and
197 deletion
+240
-197
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+22
-23
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+143
-142
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+34
-28
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+40
-3
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
3c4631ef
...
...
@@ -47,7 +47,7 @@ enum {
TASK_STATUS__SCAN_HISTORY
,
// stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT
,
// stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE
,
TASK_STATUS__CK
,
// stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK
,
// stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK_READY
,
};
...
...
@@ -103,7 +103,7 @@ typedef struct {
}
SStreamQueueItem
;
typedef
void
FTbSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
typedef
int32_t
FTaskExpand
(
void
*
ahandle
,
SStreamTask
*
pTask
,
int64_t
ver
);
typedef
int32_t
FTaskExpand
(
void
*
ahandle
,
SStreamTask
*
pTask
,
int64_t
ver
,
int64_t
checkpointId
);
typedef
struct
{
int8_t
type
;
...
...
@@ -120,7 +120,7 @@ typedef struct {
}
SStreamMergedSubmit
;
typedef
struct
{
int8_t
type
;
int8_t
type
;
int32_t
srcVgId
;
int32_t
srcTaskId
;
...
...
@@ -249,7 +249,7 @@ typedef struct SStreamChildEpInfo {
int32_t
childId
;
int32_t
taskId
;
SEpSet
epSet
;
bool
dataAllowed
;
// denote if the data from this upstream task is allowed to put into inputQ, not serialize it
bool
dataAllowed
;
// denote if the data from this upstream task is allowed to put into inputQ, not serialize it
}
SStreamChildEpInfo
;
typedef
struct
SStreamId
{
...
...
@@ -260,17 +260,17 @@ typedef struct SStreamId {
typedef
struct
SCheckpointInfo
{
int64_t
checkpointId
;
int64_t
checkpointVer
;
// latest checkpointId version
int64_t
currentVer
;
// current offset in WAL, not serialize it
int64_t
checkpointVer
;
// latest checkpointId version
int64_t
currentVer
;
// current offset in WAL, not serialize it
}
SCheckpointInfo
;
typedef
struct
SStreamStatus
{
int8_t
taskStatus
;
int8_t
downstreamReady
;
// downstream tasks are all ready now, if this flag is set
int8_t
schedStatus
;
int8_t
keepTaskStatus
;
bool
transferState
;
int8_t
timerActive
;
// timer is active
int8_t
taskStatus
;
int8_t
downstreamReady
;
// downstream tasks are all ready now, if this flag is set
int8_t
schedStatus
;
int8_t
keepTaskStatus
;
bool
transferState
;
int8_t
timerActive
;
// timer is active
}
SStreamStatus
;
typedef
struct
SHistDataRange
{
...
...
@@ -311,8 +311,8 @@ struct SStreamTask {
SHistDataRange
dataRange
;
SStreamId
historyTaskId
;
SStreamId
streamTaskId
;
SArray
*
pUpstreamInfoList
;
// SArray<SStreamChildEpInfo*>, // children info
SArray
*
pReadyMsgList
;
// SArray<SStreamChkptReadyInfo*>
SArray
*
pUpstreamInfoList
;
// SArray<SStreamChildEpInfo*>, // children info
SArray
*
pReadyMsgList
;
// SArray<SStreamChkptReadyInfo*>
// output
union
{
...
...
@@ -533,7 +533,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
void
tDeleteStreamRetrieveReq
(
SStreamRetrieveReq
*
pReq
);
void
tDeleteStreamDispatchReq
(
SStreamDispatchReq
*
pReq
);
void
tDeleteStreamDispatchReq
(
SStreamDispatchReq
*
pReq
);
int32_t
streamSetupScheduleTrigger
(
SStreamTask
*
pTask
);
...
...
@@ -541,10 +541,10 @@ int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t
streamProcessDispatchMsg
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
,
bool
exec
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SStreamDispatchRsp
*
pRsp
,
int32_t
code
);
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
void
streamTaskOpenAllUpstreamInput
(
SStreamTask
*
pTask
);
void
streamTaskCloseUpstreamInput
(
SStreamTask
*
pTask
,
int32_t
taskId
);
SStreamChildEpInfo
*
streamTaskGetUpstreamTaskEpInfo
(
SStreamTask
*
pTask
,
int32_t
taskId
);
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
void
streamTaskOpenAllUpstreamInput
(
SStreamTask
*
pTask
);
void
streamTaskCloseUpstreamInput
(
SStreamTask
*
pTask
,
int32_t
taskId
);
SStreamChildEpInfo
*
streamTaskGetUpstreamTaskEpInfo
(
SStreamTask
*
pTask
,
int32_t
taskId
);
void
streamTaskInputFail
(
SStreamTask
*
pTask
);
int32_t
streamTryExec
(
SStreamTask
*
pTask
);
...
...
@@ -556,7 +556,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t
streamScanExec
(
SStreamTask
*
pTask
,
int32_t
batchSz
);
char
*
createStreamTaskIdStr
(
int64_t
streamId
,
int32_t
taskId
);
char
*
createStreamTaskIdStr
(
int64_t
streamId
,
int32_t
taskId
);
// recover and fill history
void
streamPrepareNdoCheckDownstream
(
SStreamTask
*
pTask
);
...
...
@@ -600,7 +600,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t
streamMetaSaveTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
int32_t
streamMetaAddDeployedTask
(
SStreamMeta
*
pMeta
,
int64_t
ver
,
SStreamTask
*
pTask
);
int32_t
streamMetaAddSerializedTask
(
SStreamMeta
*
pMeta
,
int64_t
checkpointVer
,
char
*
msg
,
int32_t
msgLen
);
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
);
// todo remove it
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
);
// todo remove it
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
void
streamMetaReleaseTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
void
streamMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
...
...
@@ -617,8 +617,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t
streamTaskReleaseState
(
SStreamTask
*
pTask
);
int32_t
streamTaskReloadState
(
SStreamTask
*
pTask
);
int32_t
streamAddCheckpointSourceRspMsg
(
SStreamCheckpointSourceReq
*
pReq
,
SRpcHandleInfo
*
pRpcInfo
,
SStreamTask
*
pTask
);
int32_t
streamAddCheckpointSourceRspMsg
(
SStreamCheckpointSourceReq
*
pReq
,
SRpcHandleInfo
*
pRpcInfo
,
SStreamTask
*
pTask
);
int32_t
streamAddCheckpointReadyMsg
(
SStreamTask
*
pTask
,
int32_t
srcTaskId
,
int32_t
index
,
int64_t
checkpointId
);
#ifdef __cplusplus
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
3c4631ef
...
...
@@ -836,48 +836,48 @@ _OVER:
return
code
;
}
static
int32_t
mndCreateCheckpoint
(
SMnode
*
pMnode
,
int32_t
vgId
,
SList
*
pStreamList
)
{
void
*
buf
=
NULL
;
int32_t
tlen
=
0
;
int32_t
checkpointId
=
tGenIdPI64
();
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
SArray
*
stream
=
taosArrayInit
(
64
,
sizeof
(
void
*
));
SListIter
iter
=
{
0
};
tdListInitIter
(
pStreamList
,
&
iter
,
TD_LIST_FORWARD
);
SListNode
*
pNode
=
NULL
;
while
((
pNode
=
tdListNext
(
&
iter
))
!=
NULL
)
{
char
streamName
[
TSDB_STREAM_FNAME_LEN
]
=
{
0
};
tdListNodeGetData
(
pStreamList
,
pNode
,
streamName
);
SStreamObj
*
pStream
=
mndAcquireStream
(
pMnode
,
streamName
);
taosArrayPush
(
stream
,
&
pStream
);
}
if
(
mndBuildStreamCheckpointSourceReq2
(
&
buf
,
&
tlen
,
vgId
,
checkpointId
,
0
,
0
)
<
0
)
{
mndReleaseVgroup
(
pMnode
,
pVgObj
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
stream
);
i
++
)
{
SStreamObj
*
p
=
taosArrayGetP
(
stream
,
i
);
mndReleaseStream
(
pMnode
,
p
);
}
taosArrayDestroy
(
stream
);
return
-
1
;
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_STREAM_CHECK_POINT_SOURCE
;
}
mndReleaseVgroup
(
pMnode
,
pVgObj
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
stream
);
i
++
)
{
SStreamObj
*
p
=
taosArrayGetP
(
stream
,
i
);
mndReleaseStream
(
pMnode
,
p
);
}
taosArrayDestroy
(
stream
);
return
0
;
}
//
static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) {
//
void *buf = NULL;
//
int32_t tlen = 0;
//
int32_t checkpointId = tGenIdPI64();
//
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
//
SArray *stream = taosArrayInit(64, sizeof(void *));
//
SListIter iter = {0};
//
tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD);
//
SListNode *pNode = NULL;
//
while ((pNode = tdListNext(&iter)) != NULL) {
//
char streamName[TSDB_STREAM_FNAME_LEN] = {0};
//
tdListNodeGetData(pStreamList, pNode, streamName);
//
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
//
taosArrayPush(stream, &pStream);
//
}
//
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) {
//
mndReleaseVgroup(pMnode, pVgObj);
//
for (int i = 0; i < taosArrayGetSize(stream); i++) {
//
SStreamObj *p = taosArrayGetP(stream, i);
//
mndReleaseStream(pMnode, p);
//
}
//
taosArrayDestroy(stream);
//
return -1;
//
STransAction action = {0};
//
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
//
action.pCont = buf;
//
action.contLen = tlen;
//
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
//
}
//
mndReleaseVgroup(pMnode, pVgObj);
//
for (int i = 0; i < taosArrayGetSize(stream); i++) {
//
SStreamObj *p = taosArrayGetP(stream, i);
//
mndReleaseStream(pMnode, p);
//
}
//
taosArrayDestroy(stream);
//
return 0;
//
}
static
int32_t
mndProcessStreamCheckpointTmr
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
@@ -979,106 +979,107 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return
0
;
}
static
int32_t
mndProcessStreamCheckpointTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
int64_t
checkpointId
)
{
int64_t
timestampMs
=
taosGetTimestampMs
();
if
(
timestampMs
-
pStream
->
checkpointFreq
<
tsStreamCheckpointTickInterval
*
1000
)
{
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
NULL
,
"stream-checkpoint"
);
if
(
pTrans
==
NULL
)
return
-
1
;
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
if
(
mndTrancCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"failed to checkpoint of stream name%s, checkpointId: %"
PRId64
", reason:%s"
,
pStream
->
name
,
checkpointId
,
tstrerror
(
TSDB_CODE_MND_TRANS_CONFLICT
));
mndTransDrop
(
pTrans
);
return
-
1
;
}
mDebug
(
"start to trigger checkpoint for stream:%s, checkpoint: %"
PRId64
""
,
pStream
->
name
,
checkpointId
);
atomic_store_64
(
&
pStream
->
currentTick
,
1
);
taosWLockLatch
(
&
pStream
->
lock
);
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t
totLevel
=
taosArrayGetSize
(
pStream
->
tasks
);
for
(
int32_t
i
=
0
;
i
<
totLevel
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
tasks
,
i
);
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
0
);
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
{
int32_t
sz
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
j
);
/*A(pTask->info.nodeId > 0);*/
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
pTask
->
info
.
nodeId
);
if
(
pVgObj
==
NULL
)
{
taosWUnLockLatch
(
&
pStream
->
lock
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildStreamCheckpointSourceReq2
(
&
buf
,
&
tlen
,
pTask
->
info
.
nodeId
,
checkpointId
,
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
)
<
0
)
{
mndReleaseVgroup
(
pMnode
,
pVgObj
);
taosWUnLockLatch
(
&
pStream
->
lock
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_STREAM_CHECK_POINT_SOURCE
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
taosWUnLockLatch
(
&
pStream
->
lock
);
mndReleaseStream
(
pMnode
,
pStream
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
}
}
}
// 2. reset tick
pStream
->
checkpointFreq
=
checkpointId
;
pStream
->
checkpointId
=
checkpointId
;
pStream
->
checkpointFreq
=
taosGetTimestampMs
();
atomic_store_64
(
&
pStream
->
currentTick
,
0
);
// 3. commit log: stream checkpoint info
pStream
->
version
=
pStream
->
version
+
1
;
taosWUnLockLatch
(
&
pStream
->
lock
);
// // code condtion
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
pStream
);
if
(
pCommitRaw
==
NULL
)
{
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
goto
_ERR
;
}
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
sdbFreeRaw
(
pCommitRaw
);
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
goto
_ERR
;
}
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
{
sdbFreeRaw
(
pCommitRaw
);
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
goto
_ERR
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"failed to prepare trans rebalance since %s"
,
terrstr
());
goto
_ERR
;
}
mndTransDrop
(
pTrans
);
return
0
;
_ERR:
mndTransDrop
(
pTrans
);
return
-
1
;
}
// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
// int64_t timestampMs = taosGetTimestampMs();
// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
// return -1;
// }
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
// if (pTrans == NULL) return -1;
// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
// if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name,
// checkpointId,
// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
// mndTransDrop(pTrans);
// return -1;
// }
// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// atomic_store_64(&pStream->currentTick, 1);
// taosWLockLatch(&pStream->lock);
// // 1. redo action: broadcast checkpoint source msg for all source vg
// int32_t totLevel = taosArrayGetSize(pStream->tasks);
// for (int32_t i = 0; i < totLevel; i++) {
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// int32_t sz = taosArrayGetSize(pLevel);
// for (int32_t j = 0; j < sz; j++) {
// SStreamTask *pTask = taosArrayGetP(pLevel, j);
// /*A(pTask->info.nodeId > 0);*/
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
// if (pVgObj == NULL) {
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
// void *buf;
// int32_t tlen;
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
// pTask->id.taskId) < 0) {
// mndReleaseVgroup(pMnode, pVgObj);
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
// STransAction action = {0};
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
// action.pCont = buf;
// action.contLen = tlen;
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// mndReleaseVgroup(pMnode, pVgObj);
// if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// taosMemoryFree(buf);
// taosWUnLockLatch(&pStream->lock);
// mndReleaseStream(pMnode, pStream);
// mndTransDrop(pTrans);
// return -1;
// }
// }
// }
// }
// // 2. reset tick
// pStream->checkpointFreq = checkpointId;
// pStream->checkpointId = checkpointId;
// pStream->checkpointFreq = taosGetTimestampMs();
// atomic_store_64(&pStream->currentTick, 0);
// // 3. commit log: stream checkpoint info
// pStream->version = pStream->version + 1;
// taosWUnLockLatch(&pStream->lock);
// // // code condtion
// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
// if (pCommitRaw == NULL) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// mndTransDrop(pTrans);
// return 0;
// _ERR:
// mndTransDrop(pTrans);
// return -1;
// }
static
int32_t
mndAddStreamCheckpointToTrans
(
STrans
*
pTrans
,
SStreamObj
*
pStream
,
SMnode
*
pMnode
,
int64_t
checkpointId
)
{
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
3c4631ef
...
...
@@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t
tqOffsetRestoreFromFile
(
STqOffsetStore
*
pStore
,
const
char
*
fname
);
// tqStream
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
);
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
,
int64_t
checkpointId
);
int32_t
tqStreamTasksScanWal
(
STQ
*
pTq
);
int32_t
tqStreamTasksStatusCheck
(
STQ
*
pTq
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
3c4631ef
...
...
@@ -162,7 +162,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
taosWLockLatch
(
&
pMeta
->
lock
);
void
*
pIter
=
NULL
;
while
(
1
)
{
while
(
1
)
{
pIter
=
taosHashIterate
(
pMeta
->
pTasks
,
pIter
);
if
(
pIter
==
NULL
)
{
break
;
...
...
@@ -207,13 +207,14 @@ void tqNotifyClose(STQ* pTq) {
int64_t
st
=
taosGetTimestampMs
();
while
(
hasStreamTaskInTimer
(
pTq
->
pStreamMeta
))
{
while
(
hasStreamTaskInTimer
(
pTq
->
pStreamMeta
))
{
tqDebug
(
"vgId:%d some tasks in timer, wait for 100ms and recheck"
,
pTq
->
pStreamMeta
->
vgId
);
taosMsleep
(
100
);
}
int64_t
el
=
taosGetTimestampMs
()
-
st
;
tqDebug
(
"vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"
PRId64
" ms"
,
pTq
->
pStreamMeta
->
vgId
,
el
);
tqDebug
(
"vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"
PRId64
" ms"
,
pTq
->
pStreamMeta
->
vgId
,
el
);
}
}
...
...
@@ -249,8 +250,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
tFormatOffset
(
buf1
,
TSDB_OFFSET_LEN
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
TSDB_OFFSET_LEN
,
&
pRsp
->
rspOffset
);
tqDebug
(
"tmq poll vgId:%d consumer:0x%"
PRIx64
" (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"
PRIx64
,
vgId
,
pReq
->
consumerId
,
pReq
->
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
,
pReq
->
reqId
);
tqDebug
(
"tmq poll vgId:%d consumer:0x%"
PRIx64
" (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"
PRIx64
,
vgId
,
pReq
->
consumerId
,
pReq
->
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
,
pReq
->
reqId
);
return
0
;
}
...
...
@@ -419,8 +420,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
if
(
ASSERT
(
pHandle
->
msg
!=
NULL
))
{
tqError
(
"pHandle->msg should not be null"
);
break
;
}
else
{
SRpcMsg
msg
=
{.
msgType
=
TDMT_VND_TMQ_CONSUME
,
.
pCont
=
pHandle
->
msg
->
pCont
,
.
contLen
=
pHandle
->
msg
->
contLen
,
.
info
=
pHandle
->
msg
->
info
};
}
else
{
SRpcMsg
msg
=
{.
msgType
=
TDMT_VND_TMQ_CONSUME
,
.
pCont
=
pHandle
->
msg
->
pCont
,
.
contLen
=
pHandle
->
msg
->
contLen
,
.
info
=
pHandle
->
msg
->
info
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
QUERY_QUEUE
,
&
msg
);
taosMemoryFree
(
pHandle
->
msg
);
pHandle
->
msg
=
NULL
;
...
...
@@ -679,9 +683,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req
.
oldConsumerId
,
req
.
newConsumerId
);
STqHandle
*
pHandle
=
NULL
;
while
(
1
)
{
while
(
1
)
{
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
||
tqMetaGetHandle
(
pTq
,
req
.
subKey
)
<
0
){
if
(
pHandle
||
tqMetaGetHandle
(
pTq
,
req
.
subKey
)
<
0
)
{
break
;
}
}
...
...
@@ -697,7 +701,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
STqHandle
handle
=
{
0
};
ret
=
tqCreateHandle
(
pTq
,
&
req
,
&
handle
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
tqDestroyTqHandle
(
&
handle
);
goto
end
;
}
...
...
@@ -739,7 +743,7 @@ end:
void
freePtr
(
void
*
ptr
)
{
taosMemoryFree
(
*
(
void
**
)
ptr
);
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
)
{
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
,
int64_t
checkpointId
)
{
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
pTask
->
id
.
idStr
=
createStreamTaskIdStr
(
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
);
...
...
@@ -758,16 +762,16 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask
->
pMeta
=
pTq
->
pStreamMeta
;
// checkpoint exists, restore from the last checkpoint
if
(
pTask
->
chkInfo
.
checkpointId
!=
0
)
{
ASSERT
(
pTask
->
chkInfo
.
checkpointVer
>
0
);
pTask
->
chkInfo
.
currentVer
=
pTask
->
chkInfo
.
checkpointVer
;
pTask
->
dataRange
.
range
.
maxVer
=
pTask
->
chkInfo
.
checkpointVer
;
pTask
->
dataRange
.
range
.
minVer
=
pTask
->
chkInfo
.
checkpointVer
;
}
else
{
pTask
->
chkInfo
.
currentVer
=
ver
;
pTask
->
dataRange
.
range
.
maxVer
=
ver
;
pTask
->
dataRange
.
range
.
minVer
=
ver
;
}
//
if (pTask->chkInfo.checkpointId != 0) {
//
ASSERT(pTask->chkInfo.checkpointVer > 0);
//
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer;
//
pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer;
//
pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer;
//
} else {
pTask
->
chkInfo
.
currentVer
=
ver
;
pTask
->
dataRange
.
range
.
maxVer
=
ver
;
pTask
->
dataRange
.
range
.
minVer
=
ver
;
//
}
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
{
SStreamTask
*
pSateTask
=
pTask
;
...
...
@@ -915,7 +919,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
),
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
tqDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
") from task:0x%x (vgId:%d), rsp status %d"
,
tqDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
") from task:0x%x (vgId:%d), rsp status %d"
,
taskId
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
...
...
@@ -1092,15 +1097,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// now we can stop the stream task execution
pStreamTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
pId
,
p
StreamTask
->
info
.
taskLevel
,
p
Id
);
tqDebug
(
"s-task:%s level:%d status is set to halt by history scan task:%s"
,
pId
,
pStreamTask
->
info
.
taskLevel
,
pId
);
// if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2
(
pTask
);
}
if
(
!
streamTaskRecoverScanStep1Finished
(
pTask
))
{
tqDebug
(
"s-task:%s level:%d verRange:%"
PRId64
" - %"
PRId64
" do secondary scan-history-data after halt the related stream task:%s"
,
tqDebug
(
"s-task:%s level:%d verRange:%"
PRId64
" - %"
PRId64
" do secondary scan-history-data after halt the related stream task:%s"
,
pId
,
pTask
->
info
.
taskLevel
,
pRange
->
minVer
,
pRange
->
maxVer
,
pId
);
ASSERT
(
pTask
->
status
.
schedStatus
==
TASK_SCHED_STATUS__WAITING
);
...
...
@@ -1356,7 +1362,7 @@ int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
int32_t
tqProcessTaskPauseReq
(
STQ
*
pTq
,
int64_t
sversion
,
char
*
msg
,
int32_t
msgLen
)
{
SVPauseStreamTaskReq
*
pReq
=
(
SVPauseStreamTaskReq
*
)
msg
;
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
pReq
->
taskId
);
int32_t
code
=
tqProcessTaskPauseImpl
(
pTq
->
pStreamMeta
,
pTask
);
int32_t
code
=
tqProcessTaskPauseImpl
(
pTq
->
pStreamMeta
,
pTask
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
@@ -1403,8 +1409,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t
tqProcessTaskResumeReq
(
STQ
*
pTq
,
int64_t
sversion
,
char
*
msg
,
int32_t
msgLen
)
{
SVResumeStreamTaskReq
*
pReq
=
(
SVResumeStreamTaskReq
*
)
msg
;
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
pReq
->
taskId
);
int32_t
code
=
tqProcessTaskResumeImpl
(
pTq
,
pTask
,
sversion
,
pReq
->
igUntreated
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
pReq
->
taskId
);
int32_t
code
=
tqProcessTaskResumeImpl
(
pTq
,
pTask
,
sversion
,
pReq
->
igUntreated
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
3c4631ef
...
...
@@ -241,9 +241,11 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
// add to the ready tasks hash map, not the restored tasks hash map
int32_t
streamMetaAddDeployedTask
(
SStreamMeta
*
pMeta
,
int64_t
ver
,
SStreamTask
*
pTask
)
{
int64_t
checkpointId
=
0
;
void
*
p
=
taosHashGet
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
));
if
(
p
==
NULL
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
ver
)
<
0
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
ver
,
checkpointId
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
...
...
@@ -404,7 +406,43 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
return
0
;
}
int64_t
streamGetLatestCheckpointId
(
SStreamMeta
*
pMeta
)
{
int64_t
chkpId
=
0
;
TBC
*
pCur
=
NULL
;
if
(
tdbTbcOpen
(
pMeta
->
pTaskDb
,
&
pCur
,
NULL
)
<
0
)
{
return
chkpId
;
}
void
*
pKey
=
NULL
;
int32_t
kLen
=
0
;
void
*
pVal
=
NULL
;
int32_t
vLen
=
0
;
SDecoder
decoder
;
tdbTbcMoveToFirst
(
pCur
);
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
goto
_err
;
}
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeStreamTask
(
&
decoder
,
pTask
);
tDecoderClear
(
&
decoder
);
chkpId
=
TMAX
(
chkpId
,
pTask
->
chkInfo
.
checkpointId
);
}
_err:
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
return
chkpId
;
}
int32_t
streamLoadTasks
(
SStreamMeta
*
pMeta
,
int64_t
ver
)
{
int64_t
checkpointId
=
streamGetLatestCheckpointId
(
pMeta
);
TBC
*
pCur
=
NULL
;
if
(
tdbTbcOpen
(
pMeta
->
pTaskDb
,
&
pCur
,
NULL
)
<
0
)
{
return
-
1
;
...
...
@@ -417,7 +455,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
SDecoder
decoder
;
tdbTbcMoveToFirst
(
pCur
);
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
...
...
@@ -434,7 +471,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
// remove duplicate
void
*
p
=
taosHashGet
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
));
if
(
p
==
NULL
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
pTask
->
chkInfo
.
checkpointVer
)
<
0
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
pTask
->
chkInfo
.
checkpointVer
,
checkpointId
)
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录