From ba0c1b368907fe798f63ee4ad7f3bba08985a8e4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Jul 2022 20:04:31 +0800 Subject: [PATCH] refactor(stream) --- include/libs/executor/executor.h | 9 --------- source/dnode/vnode/src/sma/smaRollup.c | 4 ++-- source/libs/executor/src/executor.c | 4 ---- source/libs/stream/src/streamDispatch.c | 20 ++++++++++++++++---- source/libs/stream/src/streamExec.c | 2 +- 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 500418df97..a7fae403ed 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -66,15 +66,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); */ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); -/** - * Set the input data block for the stream scan. - * @param tinfo - * @param input - * @param type - * @return - */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); - /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index cf489d72a8..4b29b13abd 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -644,7 +644,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, pItem->taskInfo, suid); - if (qSetStreamInput(pItem->taskInfo, pMsg, inputType) < 0) { // INPUT__DATA_SUBMIT + if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -1329,7 +1329,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdRefRSmaInfo(pSma, pRSmaInfo); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK); + qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); tdUnRefRSmaInfo(pSma, pRSmaInfo); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5d4e5c9e8d..1618bffb09 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -94,10 +94,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { - return qSetMultiStreamInput(tinfo, input, 1, type); -} - int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2e33632f12..44f38823ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -227,6 +227,8 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; + qDebug("dispatch from task %d to task %d node %d", pTask->taskId, pReq->taskId, vgId); + tmsgSendReq(pEpSet, &msg); code = 0; @@ -281,8 +283,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0); + int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); + ASSERT(rspCnt == 0); + + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t vgSz = taosArrayGetSize(vgInfo); SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { @@ -301,7 +305,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { goto FAIL_SHUFFLE_DISPATCH; } + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + pReqs[i].taskId = pVgInfo->taskId; } + for (int32_t i = 0; i < blockNum; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId); @@ -309,6 +316,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat // TODO: get hash function by hashMethod uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + taosMemoryFree(ctbName); + + bool found = false; // TODO: optimize search int32_t j; for (j = 0; j < vgSz; j++) { @@ -318,12 +328,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } - pReqs[j].taskId = pVgInfo->taskId; pReqs[j].blockNum++; + found = true; break; } } + ASSERT(found); } + for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { // send @@ -331,7 +343,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } - pTask->shuffleDispatcher.waitingRspCnt++; + atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); } } code = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2b2c96472d..7c5cd6e391 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -27,7 +27,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); - qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT); + qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SArray* blocks = pBlock->blocks; -- GitLab