提交 9a3708e1 编写于 作者: L liuyao

pause&resume fill history

上级 7c6fbd77
...@@ -451,7 +451,8 @@ typedef struct { ...@@ -451,7 +451,8 @@ typedef struct {
SMsgHead msgHead; SMsgHead msgHead;
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
} SStreamScanHistoryReq, SStreamRecoverStep2Req; int8_t igUntreated;
} SStreamScanHistoryReq;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
...@@ -574,6 +575,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask); ...@@ -574,6 +575,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskStartHistoryTask(SStreamTask* pTask); int32_t streamTaskStartHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
// common // common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
...@@ -583,7 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status); ...@@ -583,7 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status);
// source level // source level
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); //int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
......
...@@ -1468,7 +1468,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1468,7 +1468,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version // no lock needs to secure the access of the version
if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended. // discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
...@@ -1479,7 +1479,9 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1479,7 +1479,9 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamStartRecoverTask(pTask, pReq->igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);
......
...@@ -17,6 +17,26 @@ ...@@ -17,6 +17,26 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req;
streamBuildSourceRecover1Req(pTask, &req, igUntreated);
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
return -1;
}
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/
}
return 0;
}
const char* streamGetTaskStatusStr(int32_t status) { const char* streamGetTaskStatusStr(int32_t status) {
switch(status) { switch(status) {
case TASK_STATUS__NORMAL: return "normal"; case TASK_STATUS__NORMAL: return "normal";
...@@ -38,23 +58,8 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { ...@@ -38,23 +58,8 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
streamSetParamForScanHistoryData(pTask); streamSetParamForScanHistoryData(pTask);
streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
SStreamScanHistoryReq req; int32_t code = streamStartRecoverTask(pTask, 0);
streamBuildSourceRecover1Req(pTask, &req); return code;
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
return -1;
}
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/
}
return 0;
} }
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
...@@ -262,10 +267,11 @@ int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRa ...@@ -262,10 +267,11 @@ int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRa
return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow); return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow);
} }
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq) { int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId; pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册