提交 aed99da6 编写于 作者: H Haojun Liao

refactor: refactor the stream task starting order.

上级 97c44e7c
...@@ -46,7 +46,7 @@ enum { ...@@ -46,7 +46,7 @@ enum {
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, // pause
}; };
enum { enum {
...@@ -315,7 +315,7 @@ struct SStreamTask { ...@@ -315,7 +315,7 @@ struct SStreamTask {
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId; int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo> SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
int64_t initTs;
// output // output
union { union {
STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherFixedEp fixedEpDispatcher;
...@@ -572,12 +572,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); ...@@ -572,12 +572,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history // recover and fill history
void streamPrepareNdoCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
......
...@@ -72,6 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -72,6 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1; return -1;
} }
pTask->initTs = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb; pTask->pMsgCb = &pSnode->msgCb;
...@@ -166,11 +167,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -166,11 +167,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock); taosWUnLockLatch(&pSnode->pMeta->lock);
streamPrepareNdoCheckDownstream(pTask);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
streamTaskCheckDownstreamTasks(pTask);
return 0; return 0;
} }
......
...@@ -817,11 +817,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -817,11 +817,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1; return -1;
} }
pTask->initTs = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta; pTask->pMeta = pTq->pStreamMeta;
// backup the initial status, and set it to be TASK_STATUS__INIT
pTask->chkInfo.version = ver; pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
...@@ -880,7 +882,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -880,7 +882,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->exec.pExecutor == NULL) { if (pTask->exec.pExecutor == NULL) {
return -1; return -1;
} }
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} }
...@@ -963,28 +964,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -963,28 +964,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} }
SEncoder encoder; return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, pTask->id.taskId);
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
} }
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
...@@ -1062,13 +1042,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1062,13 +1042,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
taosWUnLockLatch(&pStreamMeta->lock); taosWUnLockLatch(&pStreamMeta->lock);
// 3. It's an fill history task, do nothing. wait for the main task to start it
streamPrepareNdoCheckDownstream(pTask);
tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
// 3. It's an fill history task, do nothing. wait for the main task to start it
streamTaskCheckDownstreamTasks(pTask);
return 0; return 0;
} }
...@@ -1087,8 +1065,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1087,8 +1065,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// do recovery step 1 // do recovery step 1
const char* pId = pTask->id.idStr; const char* id = pTask->id.idStr;
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId, tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
...@@ -1104,14 +1082,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1104,14 +1082,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el); tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL; SVersionRange* pRange = NULL;
...@@ -1125,7 +1103,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1125,7 +1103,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING; pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s scan-history-task set status to be dropping", pId); tqDebug("s-task:%s scan-history-task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -1139,14 +1117,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1139,14 +1117,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug( tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100); taosMsleep(100);
} }
// now we can stop the stream task execution // now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pId); pStreamTask->info.taskLevel, id);
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2(pTask); streamHistoryTaskSetVerRangeStep2(pTask);
...@@ -1154,7 +1132,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1154,7 +1132,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (!streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId); id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -1165,7 +1143,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1165,7 +1143,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSourceScanHistoryData(pTask); streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
...@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
el = (taosGetTimestampMs() - st) / 1000.0; el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
// 3. notify downstream tasks to transfer executor state after handle all history blocks. // 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) { if (!pTask->status.transferState) {
...@@ -1191,7 +1169,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1191,7 +1169,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamTryExec(pTask); streamTryExec(pTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING; pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s scan-history-task set status to be dropping", pId); tqDebug("s-task:%s scan-history-task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
...@@ -1212,13 +1190,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1212,13 +1190,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId, tqDebug(
pWindow->skey, pWindow->ekey); "s-task:%s scan history in current time window completed, no related fill history task, reset the time "
"window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey);
} else { } else {
tqDebug( tqDebug(
"s-task:%s history data in current time window scan completed, now start to handle data from WAL, start " "s-task:%s scan history in current time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
} }
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
...@@ -1452,31 +1432,56 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL ...@@ -1452,31 +1432,56 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
return 0; return 0;
} }
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { // todo rule out the status when pause not suitable.
if (pTask) { static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
if (!streamTaskShouldPause(&pTask->status)) { if (!streamTaskShouldPause(&pTask->status)) {
tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
streamMetaReleaseTask(pStreamMeta, pTask);
} else {
return -1;
} }
return 0; return 0;
} }
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
int32_t code = tqProcessTaskPauseImpl(pMeta, pTask);
if (code != 0) { if (code != 0) {
streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
if (pHistoryTask) { SStreamTask* pHistoryTask = NULL;
code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask); if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId,
pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr);
code = tqProcessTaskPauseImpl(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
if (pHistoryTask != NULL) {
streamMetaReleaseTask(pMeta, pHistoryTask);
} }
return code; return code;
} }
......
...@@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { ...@@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
continue; continue;
} }
streamTaskCheckDownstreamTasks(pTask); if (pTask->info.fillHistory == 1) {
tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr,
pTask->streamTaskId.taskId);
continue;
}
streamTaskDoCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
taosArrayDestroy(pTaskList);
taosArrayDestroy(pTaskList);
return 0; return 0;
} }
......
...@@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
if (!streamTaskShouldStop(&(*ppTask)->status)) { if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask; return *ppTask;
} }
} }
...@@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) { if (ref > 0) {
qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) { } else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status)); ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
......
...@@ -17,6 +17,18 @@ ...@@ -17,6 +17,18 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1;
int64_t el = (taosGetTimestampMs() - pTask->initTs);
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
}
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req; SStreamScanHistoryReq req;
streamBuildSourceRecover1Req(pTask, &req, igUntreated); streamBuildSourceRecover1Req(pTask, &req, igUntreated);
...@@ -51,9 +63,9 @@ const char* streamGetTaskStatusStr(int32_t status) { ...@@ -51,9 +63,9 @@ const char* streamGetTaskStatusStr(int32_t status) {
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, // qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); // pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
//
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
...@@ -84,7 +96,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { ...@@ -84,7 +96,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
} }
// check status // check status
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
SHistDataRange* pRange = &pTask->dataRange; SHistDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window; STimeWindow* pWindow = &pRange->window;
...@@ -129,11 +141,13 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { ...@@ -129,11 +141,13 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { } else {
pTask->status.downstreamReady = 1; qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskSetForReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
launchFillHistoryTask(pTask);
} }
return 0; return 0;
...@@ -171,7 +185,28 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p ...@@ -171,7 +185,28 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
} }
int32_t streamTaskCheckStatus(SStreamTask* pTask) { int32_t streamTaskCheckStatus(SStreamTask* pTask) {
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; return (pTask->status.downstreamReady == 1)? 1:0;
}
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskSetForReady(pTask, numOfReqs);
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;
const char* str = streamGetTaskStatusStr(status);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
streamTaskSetRangeStreamCalc(pTask);
if (status == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, numOfReqs, str);
streamTaskLaunchScanHistory(pTask);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
}
// when current stream task is ready, check the related fill history task.
launchFillHistoryTask(pTask);
} }
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
...@@ -201,17 +236,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -201,17 +236,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (left == 0) { if (left == 0) {
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
pTask->status.downstreamReady = 1;
doProcessDownstreamReadyRsp(pTask, numOfReqs);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else { } else {
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
...@@ -223,19 +249,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -223,19 +249,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1; return -1;
} }
// set the downstream tasks have been checked flag doProcessDownstreamReadyRsp(pTask, 1);
ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1;
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
...@@ -248,6 +262,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -248,6 +262,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0; return 0;
} }
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo *pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
// common // common
int32_t streamSetParamForScanHistory(SStreamTask* pTask) { int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
...@@ -434,7 +474,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { ...@@ -434,7 +474,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
} }
// check if downstream tasks have been ready // check if downstream tasks have been ready
streamTaskCheckDownstreamTasks(pHTask); streamTaskDoCheckDownstreamTasks(pHTask);
} }
typedef struct SStreamTaskRetryInfo { typedef struct SStreamTaskRetryInfo {
...@@ -500,7 +540,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -500,7 +540,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition // todo fix the bug: 2. race condition
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) { int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId; int32_t hTaskId = pTask->historyTaskId.taskId;
...@@ -675,40 +715,78 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory ...@@ -675,40 +715,78 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
return 0; return 0;
} }
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
", verRang:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
}
}
void launchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId;
if (tId == 0) {
return;
}
ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);
// launch associated fill history task
streamLaunchFillHistoryTask(pTask);
}
// todo handle race condition, this task may be destroyed // todo handle race condition, this task may be destroyed
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
return;
}
// calculate the correct start time window, and start the handle the history data for the main task.
/* if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks
streamLaunchFillHistoryTask(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else { } else {
// calculate the correct start time window, and start the handle the history data for the main task. SHistDataRange* pRange = &pTask->dataRange;
if (pTask->historyTaskId.taskId != 0) { qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
// check downstream tasks for associated scan-history-data tasks " - %" PRId64,
streamCheckHistoryTaskDownstream(pTask); pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}*/
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
// check downstream tasks for itself // check downstream tasks for itself
streamTaskCheckDownstreamTasks(pTask); streamTaskDoCheckDownstreamTasks(pTask);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册