提交 69c9eda7 编写于 作者: H Haojun Liao

fix(stream): fix race condition.

上级 63ef0459
......@@ -309,6 +309,7 @@ struct SStreamTask {
STaskExec exec;
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
......
......@@ -319,6 +319,10 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
(*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId;
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId);
}
}
......
......@@ -1088,7 +1088,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
// do recovery step 1
tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr);
int64_t st = taosGetTimestampMs();
streamSourceRecoverScanStep1(pTask);
......@@ -1100,10 +1100,31 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
if (pTask->info.fillHistory) {
// todo transfer the executor status, and then destroy this stream task
if (pTask->info.fillHistory) {/*
// 1. stop the related stream task, get the current scan wal version of stream task, ver1.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo handle error
}
pStreamTask->status.taskStatus = TASK_STATUS__PAUSE;
// if it's an source task, extract the last version in wal.
// 2. wait for downstream tasks to completed
// 3. do secondary scan of the history data scan, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task.
*/
} else {
// todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of
......
......@@ -264,13 +264,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
pTask->msgInfo.retryCount = 0;
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus);
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) {
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
......
......@@ -440,8 +440,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, batchSize, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册