diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 83e0bd33a666c0378cc345881e91f91c3aefe7f9..e33ff6b778269d3b4fff398aee74c6486b643b65 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -89,6 +89,16 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n */ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +/** + * Set block for sma + * @param tinfo + * @param pBlocks + * @param numOfInputBlock + * @param type + * @return + */ +int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); + /** * Update the table id list, add or remove. * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c56b32514a3195f311d504c28d66609f937e8545..19a0fbd6296885453e583956fa6b2d2e2aebcfff 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,11 +15,11 @@ #include "sma.h" -#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt -#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (120000) // ms -#define RSMA_FETCH_ACTIVE_MAX (1000) // ms -#define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt +#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt +#define RSMA_FETCH_DELAY_MAX (120000) // ms +#define RSMA_FETCH_ACTIVE_MAX (1000) // ms +#define RSMA_FETCH_INTERVAL (5000) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -839,7 +839,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, tdRsmaPrintSubmitReq(pSma, pReq); } #endif - if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { + if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -1404,7 +1404,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { pItem->nScanned = 0; - if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { + if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fddaeff8a50653a334cb6fea8010aa821fe8c5db..3e1e68a56ad52b98484e21191d920c9d3386e398 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -30,6 +30,46 @@ static void cleanupRefPool() { taosCloseRef(ref); } +static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { + ASSERT(pOperator != NULL); + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + + if (pOperator->numOfDownstream > 1) { // not handle this in join query + qError("join not supported for stream block scan, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + pOperator->status = OP_NOT_OPENED; + return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); + } else { + pOperator->status = OP_NOT_OPENED; + + SStreamScanInfo* pInfo = pOperator->info; + + if (type == STREAM_INPUT__MERGED_SUBMIT) { + for (int32_t i = 0; i < numOfBlocks; i++) { + SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); + taosArrayPush(pInfo->pBlockLists, &pReq); + } + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + } else if (type == STREAM_INPUT__DATA_SUBMIT) { + taosArrayPush(pInfo->pBlockLists, &input); + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + } else if (type == STREAM_INPUT__DATA_BLOCK) { + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + taosArrayPush(pInfo->pBlockLists, &pDataBlock); + } + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; + } + + return TSDB_CODE_SUCCESS; + } +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -100,6 +140,27 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } +int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { + if (tinfo == NULL) { + return TSDB_CODE_QRY_APP_ERROR; + } + + if (pBlocks == NULL || numOfBlocks == 0) { + return TSDB_CODE_SUCCESS; + } + + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + + int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo)); + } else { + qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo)); + } + + return code; +} + qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { // create raw scan