From a89ce1a20b2da84a42257c01e7be000bc3ad47ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Aug 2023 01:24:07 +0800 Subject: [PATCH] fix(stream): transfer state by using data block. --- include/common/tcommon.h | 1 + include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamExec.c | 60 ++++++++++++++++++++++---- source/libs/stream/src/streamRecover.c | 29 +++++++++++++ 4 files changed, 82 insertions(+), 9 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 705f5b675b..1dfe30af71 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -152,6 +152,7 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__TRANS_STATE, STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__DESTROY, }; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b9b24917f3..b7e323a213 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -627,6 +627,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); +int32_t appendTranstateIntoInputQ(SStreamTask* pTask); // agg level int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b479931cd2..102c8805b5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -390,6 +390,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__SOURCE) { streamTaskFillHistoryFinished(pTask); streamTaskEndScanWAL(pTask); + + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle this @@ -460,6 +465,40 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } } +int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) { + int32_t remain = streamAlignTransferState(pTask); + if (remain > 0) { + qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); + return 0; + } + + // transfer the ownership of executor state + qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id); + ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); + + pTask->status.transferState = true; + } + + // dispatch the transtate block to downstream task immediately + if (level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__AGG) { + // pBlock-> = pTask->id.taskId; + pBlock->srcVgId = pTask->pMeta->vgId; + code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + if (code == 0) { + streamDispatchStreamBlock(pTask); + } else { + streamFreeQitem((SStreamQueueItem*)pBlock); + } + } + + return code; +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -484,6 +523,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } + if (pInput->type == STREAM_INPUT__TRANS_STATE) { + streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); + return 0; + } + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); @@ -557,17 +601,15 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); // 1. notify all downstream tasks to transfer executor state after handle all history blocks. - int32_t code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } +// pTask->status.transferState = true; + appendTranstateIntoInputQ(pTask); // 2. do transfer stream task operator states. - pTask->status.transferState = true; - code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle error - return code; - } + // todo remove this +// int32_t code = streamDoTransferStateToStreamTask(pTask); +// if (code != TSDB_CODE_SUCCESS) { // todo handle error +// return code; +// } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index e59b3f682d..b46ded6ca7 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -415,6 +415,35 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe return 0; } +int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { + SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); + if (pTranstate == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pBlock->info.type = STREAM_TRANS_STATE; + pBlock->info.rows = 1; + pBlock->info.childId = pTask->info.selfChildId; + + pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; + taosArrayPush(pTranstate->blocks, pBlock); + + taosMemoryFree(pBlock); + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; + } + + streamSchedExec(pTask); + return TSDB_CODE_SUCCESS; +} + int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; -- GitLab