diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6fca2858dcefbe79c2bfe7b91d014745564f5227..75de0129470963c34edb9922f5d8e3c6cd9fc40d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo taosArrayPush(pTaskInfo->pResultBlockList, &p1); p = p1; } else { - p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); + p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); copyDataBlock(p, pRes); } @@ -574,9 +574,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SArray* pList = pTaskInfo->pResultBlockList; - size_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { + SArray* pList = pTaskInfo->pResultBlockList; + size_t num = taosArrayGetSize(pList); + for (int32_t i = 0; i < num; ++i) { SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i); blockDataDestroy(*p); } @@ -747,11 +747,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { } int32_t nOptrWithVal = 0; -// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); -// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { -// taosMemoryFreeClear(*pOutput); -// *len = 0; -// } + // int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); + // if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { + // taosMemoryFreeClear(*pOutput); + // *len = 0; + // } return 0; } @@ -763,7 +763,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le } return 0; -// return decodeOperator(pTaskInfo->pRoot, pInput, len); + // return decodeOperator(pTaskInfo->pRoot, pInput, len); } int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { @@ -890,35 +890,35 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); - ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ + /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ + /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); - ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ + /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ + /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); - ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ + /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ + /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } @@ -926,7 +926,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream > 1) { qError("unexpected stream, multiple downstream"); - ASSERT(0); + /*ASSERT(0);*/ return -1; } return 0; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 8e81a000984da0caad273b0a7f74ee144eecdc10..12308051b735e714b7a7757e1152937e2201189f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "filter.h" #include "executorimpl.h" +#include "filter.h" #include "functionMgt.h" typedef struct SProjectOperatorInfo { @@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; + pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); - setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, - destroyProjectOperatorInfo, optrDefaultBufFn, NULL); + setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, + optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pInfo->binfo.pRes = pResBlock; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL); + setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, + optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -697,13 +700,30 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], - pInputData->numOfRows); + if (pInputData->pData[0] == NULL) { + int32_t slotId = pfCtx->param[0].pCol->slotId; + + SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); + + colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput, + pSrcBlock->info.rows); + } else { + colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], + pInputData->numOfRows); + } } else { - colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); - } + if (pInputData->pData[0] == NULL) { + int32_t slotId = pfCtx->param[0].pCol->slotId; + + SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); + colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info); - numOfRows = pInputData->numOfRows; + numOfRows = pSrcBlock->info.rows; + } else { + colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); + numOfRows = pInputData->numOfRows; + } + } } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);