提交 11f0c3b3 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 52824926
......@@ -339,7 +339,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
......@@ -576,7 +576,7 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
......@@ -585,8 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status);
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
......
......@@ -165,7 +165,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
// 3.go through recover steps to fill history
if (pTask->info.fillHistory) {
streamSetParamForRecover(pTask);
streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
}
......
......@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <util/ttimer.h>
#include "ttimer.h"
#include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
......@@ -274,8 +274,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
......
......@@ -33,9 +33,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer);
pRange->minVer, pRange->maxVer);
streamSetParamForRecover(pTask);
streamSetParamForScanHistoryData(pTask);
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
SStreamRecoverStep1Req req;
......@@ -69,11 +69,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
streamSetParamForRecover(pTask);
streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal status immediately", pTask->id.idStr);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
}
return 0;
......@@ -107,7 +107,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->recoverTryingDownstream = numOfVgs;
pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
for (int32_t i = 0; i < numOfVgs; i++) {
......@@ -130,7 +130,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
return 0;
}
int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
SStreamTaskCheckReq req = {
.reqId = pRsp->reqId,
.streamId = pRsp->streamId,
......@@ -186,7 +186,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1;
}
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
ASSERT(left >= 0);
if (left == 0) {
......@@ -203,8 +203,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else {
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left);
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (pRsp->reqId != pTask->checkReqId) {
......@@ -212,8 +213,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
ASSERT(pTask->status.checkDownstream == 0);
pTask->status.checkDownstream = 1;
ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
......@@ -233,17 +234,18 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pRsp->downstreamNodeId);
taosMsleep(100);
streamRecheckOneDownstream(pTask, pRsp);
streamRecheckDownstream(pTask, pRsp);
}
return 0;
}
// common
int32_t streamSetParamForRecover(SStreamTask* pTask) {
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamSetParamForRecover(exec);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRestoreParam(exec);
......@@ -272,13 +274,6 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
}
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
const char* id = pTask->id.idStr;
......@@ -305,7 +300,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
req.taskId = pTask->fixedEpDispatcher.taskId;
streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
......@@ -315,7 +310,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册