From 5e1bbe0e7e0474338b2b5a968150434ac96d8cbf Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Jul 2022 16:55:23 +0800 Subject: [PATCH] refactor(sma): remove option --- include/libs/executor/executor.h | 7 +++---- source/dnode/vnode/src/sma/smaRollup.c | 10 +++++----- source/libs/executor/src/executor.c | 18 ++++++++---------- source/libs/stream/src/streamExec.c | 8 ++++---- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 65e20336cc..500418df97 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -64,8 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, - SSchemaWrapper** pSchema); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); /** * Set the input data block for the stream scan. @@ -74,7 +73,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); /** * Set multiple input data blocks for the stream scan. @@ -84,7 +83,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool * @param type * @return */ -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid); +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); /** * Update the table id list, add or remove. diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a6fde1e2d2..cf489d72a8 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -611,8 +611,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche goto _err; } - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma), - suid, pItem->level, output->info.version); + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, + SMA_VID(pSma), suid, pItem->level, output->info.version); taosMemoryFreeClear(pReq); taosArrayClear(pResult); @@ -644,7 +644,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, pItem->taskInfo, suid); - if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // INPUT__DATA_SUBMIT + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType) < 0) { // INPUT__DATA_SUBMIT smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -1329,7 +1329,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdRefRSmaInfo(pSma, pRSmaInfo); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false); + qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); tdUnRefRSmaInfo(pSma, pRSmaInfo); @@ -1356,4 +1356,4 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { _end: tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); -} \ No newline at end of file +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8b1cbb5ae8..5d4e5c9e8d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -30,8 +30,7 @@ static void cleanupRefPool() { taosCloseRef(ref); } -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, - char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -44,12 +43,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - pInfo->assignBlockUid = assignUid; + /*pInfo->assignBlockUid = assignUid;*/ // TODO: if a block was set but not consumed, // prevent setting a different type of block @@ -95,11 +94,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { - return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { + return qSetMultiStreamInput(tinfo, input, 1, type); } -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid) { +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -110,8 +109,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = - doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { @@ -337,7 +335,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pSinkParam); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f782de95b9..2b2c96472d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -22,22 +22,22 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SStreamQueueItem* pItem = (SStreamQueueItem*)data; if (pItem->type == STREAM_INPUT__GET_RES) { SStreamTrigger* pTrigger = (SStreamTrigger*)data; - qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false); + qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); - qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); + qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SArray* blocks = pBlock->blocks; qDebug("task %d %p set ssdata input", pTask->taskId, pTask); - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; SArray* blocks = pMerged->reqs; qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT); } else { ASSERT(0); } -- GitLab