提交 0d0e30b7 编写于 作者: H Haojun Liao

fix(stream): fix the error.

上级 ce721a01
...@@ -152,6 +152,7 @@ enum { ...@@ -152,6 +152,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__CHECKPOINT_TRIGGER,
STREAM_INPUT__TRANS_STATE, STREAM_INPUT__TRANS_STATE,
STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY, STREAM_INPUT__DESTROY,
......
...@@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// todo check the output queue for fill-history task, and wait for it complete
// 1. expand the query time window for stream task of WAL scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
...@@ -389,8 +392,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -389,8 +392,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask);
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code; return code;
...@@ -405,14 +406,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -405,14 +406,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return code; return code;
} }
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
const char* id) { int32_t retryTimes = 0;
int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5;
int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr;
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s sink task handle result block one-by-one", id);
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
}
// non sink task
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -420,47 +448,52 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ...@@ -420,47 +448,52 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10); taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue; continue;
} }
qDebug("===stream===break batchSize:%d", *numOfBlocks); qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (*pInput == NULL) { // do not merge blocks for sink node and check point data block
ASSERT((*numOfBlocks) == 0); if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
*pInput = qItem; qItem->type == STREAM_INPUT__TRANS_STATE) {
} else { if (*pInput == NULL) {
// todo we need to sort the data block, instead of just appending into the array list. qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
void* newRet = streamMergeQueueItem(*pInput, qItem); *numOfBlocks = 1;
if (newRet == NULL) { *pInput = qItem;
if (terrno == 0) { return TSDB_CODE_SUCCESS;
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); } else {
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
tstrerror(terrno)); *numOfBlocks);
}
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
*pInput = newRet; *pInput = newRet;
} }
*numOfBlocks += 1; *numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
} }
} }
} }
...@@ -476,14 +509,18 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -476,14 +509,18 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
return 0; return 0;
} }
}
// transfer the ownership of executor state // transfer the ownership of executor state
if (level == TASK_LEVEL__SOURCE) {
qDebug("s-task:%s open transfer state flag for source task", id);
} else {
qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id); qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
} }
pTask->status.transferState = true;
// dispatch the tran-state block to downstream task immediately // dispatch the tran-state block to downstream task immediately
int32_t type = pTask->outputInfo.type; int32_t type = pTask->outputInfo.type;
if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) && if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) &&
...@@ -518,7 +555,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -518,7 +555,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id); qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize);
if (pInput == NULL) { if (pInput == NULL) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册