提交 fcc706c4 编写于 作者: H Haojun Liao

enh(stream): refactor the fill history task.

上级 077e1a1e
......@@ -221,7 +221,7 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
......
......@@ -553,7 +553,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
......@@ -562,7 +562,7 @@ int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
......
......@@ -312,11 +312,11 @@ static SArray* addNewTaskList(SArray* pTasksList) {
// set the history task id
static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
for(int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
SStreamTask* pStreamTask = taosArrayGet(pTaskList, i);
SStreamTask* pHTask = taosArrayGet(pHTaskList, i);
SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
pStreamTask->historyTaskId.taskId = pHTask->id.taskId;
pStreamTask->historyTaskId.streamId = pHTask->id.streamId;
(*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId;
}
}
......@@ -359,7 +359,7 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid,
pStream->conf.fillHistory, hasExtraSink);
0, hasExtraSink);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
......@@ -367,7 +367,7 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, 0,
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pStream->conf.fillHistory,
hasExtraSink);
setHTasksId(pTaskList, pHTaskList);
}
......
......@@ -299,6 +299,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->conf.igExpired = pCreate->igExpired;
......
......@@ -1027,13 +1027,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWUnLockLatch(&pStreamMeta->lock);
// 3.go through recover steps to fill history
// 3. for fill history task, do nothing. wait for the main task to start it
if (pTask->fillHistory) {
streamTaskCheckDownstream(pTask, sversion);
tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else {
if (pTask->historyTaskId.taskId != 0) {
// todo fix the bug: 1. maybe failed to located the fill history task, since it is not built yet. 2. race condition
// an fill history task needs to be started.
// Set the execute conditions, including the query time window and the version range
SStreamTask* pHTask = taosHashGet(pStreamMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = sversion;
pHTask->dataRange.window.skey = INT64_MIN;
pHTask->dataRange.window.ekey = 1000000;
tqDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64
" verrange:%" PRId64 " - %" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
// check if downstream tasks have been ready
streamTaskCheckDownstream(pHTask, sversion);
}
}
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
pTask->status.taskStatus, numOfTasks);
return 0;
}
......
......@@ -65,6 +65,7 @@ typedef struct {
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHisotryeKey1;
int64_t fillHistoryVer2;
SStreamState* pState;
int64_t dataVersion;
......
......@@ -869,10 +869,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.fillHistoryVer1 = ver;
pTaskInfo->streamInfo.fillHisotryeKey1 = ekey;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
return 0;
}
......
......@@ -1779,7 +1779,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
qDebug("stream scan started, %s", id);
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
......@@ -1788,14 +1788,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = 0;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64 ", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
}
......
......@@ -386,6 +386,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
// todo handle the fill history task
if (pTask->fillHistory) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstream(pTask, ver);
......
......@@ -15,7 +15,7 @@
#include "streamInc.h"
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......@@ -23,7 +23,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus);
streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version);
streamSourceRecoverPrepareStep1(pTask, pTask->dataRange.range.maxVer, pTask->dataRange.window.ekey);
SStreamRecoverStep1Req req;
streamBuildSourceRecover1Req(pTask, &req);
......@@ -54,8 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
}
// checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) {
qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver);
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
......@@ -88,13 +88,13 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
streamTaskLaunchRecover(pTask, version);
qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
streamTaskLaunchRecover(pTask);
}
return 0;
......@@ -135,7 +135,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
}
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t ver) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
......@@ -166,14 +166,14 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
pTask->checkReqIds = NULL;
qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
streamTaskLaunchRecover(pTask, version);
streamTaskLaunchRecover(pTask);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (pRsp->reqId != pTask->checkReqId) {
return -1;
}
streamTaskLaunchRecover(pTask, version);
streamTaskLaunchRecover(pTask);
} else {
ASSERT(0);
}
......@@ -204,9 +204,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey) {
void* exec = pTask->exec.pExecutor;
return qStreamSourceRecoverStep1(exec, ver);
return qStreamSourceRecoverStep1(exec, ver, ekey);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册