From 6ded3284da6a5e6adc6f35b943d32d3b86373d9c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 11:37:17 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/stream.c | 7 +- source/libs/stream/src/streamData.c | 4 +- source/libs/stream/src/streamExec.c | 269 ++++++++++++++++------------ source/libs/stream/src/streamMeta.c | 2 + 5 files changed, 169 insertions(+), 115 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f5d6c38f3..7c7039c1c0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -536,7 +536,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 128d26bfb1..7f19529d11 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -195,7 +195,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { +// todo add log +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); @@ -209,10 +210,14 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); if (code != 0) { + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); return code; } + streamDispatch(pTask); } + return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 2233e21f60..ca24414db8 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -185,11 +185,13 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; - int32_t sz = taosArrayGetSize(pMerge->submits); + + int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); + if (ref == 0) { SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); taosMemoryFree(pSubmit->msgStr); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e888faf4d1..122dbc47f5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -19,7 +19,10 @@ #define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 -bool streamTaskShouldStop(const SStreamStatus* pStatus) { +static int32_t updateCheckPointInfo (SStreamTask* pTask); +static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); + + bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } @@ -29,55 +32,17 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes, int64_t* resSize) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - *resSize = 0; - - while (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, - atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(2); - } else { - break; - } - } - // set input - const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; - if (pItem->type == STREAM_INPUT__GET_RES) { - const SStreamTrigger* pTrigger = (const SStreamTrigger*)data; - qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; - qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; - - SArray* pBlockList = pBlock->blocks; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; - - SArray* pBlockList = pMerged->submits; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { - const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; - qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else { - ASSERT(0); - } + *totalBlocks = 0; + *totalSize = 0; + + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + int32_t size = 0; + int32_t numOfBlocks = 0; - // pExecutor while (1) { if (streamTaskShouldStop(&pTask->status)) { return 0; @@ -98,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); @@ -124,11 +89,49 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; - (*resSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; + taosArrayPush(pRes, &block); - qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes), - (*resSize)/1048576.0); + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, + pTask->selfChildId, numOfBlocks, size / 1048576.0); + + // current output should be dispatched to down stream nodes + if (numOfBlocks > 1000) { + code = updateCheckPointInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + + if (numOfBlocks > 0) { + SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + return -1; + } + + qDebug("s-task:%s output exec stream data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + + code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(pStreamBlocks); + return -1; + } + } else { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + } + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + size = 0; + numOfBlocks = 0; + + ASSERT(taosArrayGetSize(pRes) == 0); } return 0; @@ -205,7 +208,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - code = streamTaskOutput(pTask, qRes); + code = streamTaskOutputResultBlock(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); @@ -251,7 +254,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, (SStreamDataBlock*)pItem); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); } // exec impl @@ -262,6 +265,57 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif +int32_t updateCheckPointInfo (SStreamTask* pTask) { + int64_t ckId = 0; + int64_t dataVer = 0; + qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); + + SCheckpointInfo* pCkInfo = &pTask->chkInfo; + if (ckId > pCkInfo->id) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 + ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + + pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; + + taosWLockLatch(&pTask->pMeta->lock); + + streamMetaSaveTask(pTask->pMeta, pTask); + if (streamMetaCommit(pTask->pMeta) < 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + return -1; + } else { + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + } + } + + return TSDB_CODE_SUCCESS; +} + +SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { + SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); + if (pStreamBlocks == NULL) { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } + + pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlocks->blocks = pRes; + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pSubmit->ver; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pMerged->ver; + } + + return pStreamBlocks; +} + int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { @@ -330,79 +384,70 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); - streamTaskOutput(pTask, (SStreamDataBlock*)pInput); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } - int64_t resSize = 0; - int64_t st = taosGetTimestampMs(); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - - streamTaskExecImpl(pTask, pInput, pRes, &resSize); - - int64_t ckId = 0; - int64_t dataVer = 0; - qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, - pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - - pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; - - taosWLockLatch(&pTask->pMeta->lock); - - streamMetaSaveTask(pTask->pMeta, pTask); - if (streamMetaCommit(pTask->pMeta) < 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); - return -1; + // wait for the task to be ready to go + while (pTask->taskLevel == TASK_LEVEL__SOURCE) { + int8_t status = atomic_load_8(&pTask->status.taskStatus); + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { + qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + atomic_load_8(&pTask->status.taskStatus)); + taosMsleep(2); } else { - taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + break; } } - double el = (taosGetTimestampMs() - st) / 1000.0; - - int32_t numOfBlocks = taosArrayGetSize(pRes); - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize/1048576.0, numOfBlocks); + int64_t st = taosGetTimestampMs(); + qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - if (numOfBlocks > 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - return -1; + { + // set input + void* pExecutor = pTask->exec.pExecutor; + + const SStreamQueueItem* pItem = pInput; + if (pItem->type == STREAM_INPUT__GET_RES) { + const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { + const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; + + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; + + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + } else { + ASSERT(0); } + } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pSubmit->ver; - } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pMerged->ver; - } + int64_t resSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - code = streamTaskOutput(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - // backpressure and record position - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - taosFreeQitem(qRes); - return -1; - } - } else { - taosArrayDestroy(pRes); - } + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 682ce08c7f..e091b6864b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -296,6 +296,7 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { return 0; } +// todo add error log int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { qError("failed to commit stream meta"); @@ -311,6 +312,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } + return 0; } -- GitLab