diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 5f034e3177ade53b317dd2a722e4833672dfbb5e..cc9bc4295e0eb77115abfcf231439e6fd44f4e89 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -248,7 +248,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ASSERT(pInfo != NULL); if (!pInfo->dataAllowed) { - qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId); + qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId); status = TASK_INPUT_STATUS__BLOCKED; } else { // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1fc2e96161e568235ab1ddc58a298092f1f513db..4e7a291d8b72d8bc84e3a71417ee352f4866da6b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -174,6 +174,16 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status pTask->status.taskStatus = TASK_STATUS__CK; + { // todo: remove this when the pipeline checkpoint generating is used. + SStreamMeta* pMeta = pTask->pMeta; + taosWLockLatch(&pMeta->lock); + if (pMeta->chkptNotReadyTasks == 0) { + pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + } + + taosWUnLockLatch(&pMeta->lock); + } + //todo fix race condition: set the status and append checkpoint block int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b896b47ee42b39c420c24cad6d52ebbca59fe377..ce2747a5c7ca59c1db1a6d24abbf9df9758436d5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -545,6 +545,8 @@ int32_t streamTryExec(SStreamTask* pTask) { if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { // check for all tasks, and do generate the vnode-wide checkpoint data. // todo extract method + + SStreamMeta* pMeta = pTask->pMeta; int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); @@ -554,6 +556,9 @@ int32_t streamTryExec(SStreamTask* pTask) { streamSaveTasks(pMeta, pTask->checkpointingId); qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); + } else { + qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status:%d, total:%d", pMeta->vgId, remain, + (int32_t)taosArrayGetSize(pMeta->pTaskList)); } // send check point response to upstream task