diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9c0b0802ab4dec1bf4e317c06313cfde50f8317a..344b4fa6557b2c704288c6ac6aeca3282150a096 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -167,7 +167,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { + if (colDataAppend(pColData, curRow, sVal.val, false) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 598647f797efb3b2ce835a1f170abd28e8a468c9..d377ec20397bb708b5eb6cc2df1128e0830151e8 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId); + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); case TDMT_VND_STREAM_TRIGGER: - return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId); + return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d7023930daa8ea8801b5ee52c989c011c2a0cbf..ce0069fdfa6e680c49dd0776c3eab340642a4c0f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -422,6 +422,7 @@ typedef struct SStreamBlockScanInfo { uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times void* readerHandle; // stream block reader handle + SArray* pColMatchInfo; // } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9fb009c91769ce3fbeb7e833e61566916ea06c93..93fbfe43ce79b0b45fc4162d4d1267623da5f1d4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -227,7 +227,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL taosArrayDestroy(pGroupResInfo->pRows); } - pGroupResInfo->pRows = pArrayList->pData; + pGroupResInfo->pRows = pArrayList; pGroupResInfo->index = 0; ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f734c8185835a3b5748a31c28be22fb1ac79f694..6265fe612339218bbf0dcbc057a97b0739337f54 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -352,7 +352,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { continue; } - idata.info.type = pDescNode->dataType.type; + idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; idata.info.scale = pDescNode->dataType.scale; idata.info.slotId = pDescNode->slotId; @@ -1551,8 +1551,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = (int64_t*)pColDataInfo->pData; - assert(tsCols[0] == pSDataBlock->info.window.skey && - tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); +// assert(tsCols[0] == pSDataBlock->info.window.skey && +// tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); @@ -3660,6 +3660,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased if (pCtx[j].fpSet.process) { // TODO set the dummy function. pCtx[j].fpSet.finalize(&pCtx[j]); + pResInfo->initialized = true; } if (pRow->numOfRows < pResInfo->numOfRes) { @@ -3778,10 +3779,6 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S continue; } - // int32_t functionId = pCtx[i].functionId; - // if (functionId < 0) { - // continue; - // } // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { // if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; // } @@ -4971,7 +4968,20 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) return NULL; } - pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); + SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); + + int32_t numOfCols = pInfo->pRes->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + + ASSERT(pColMatchInfo->colId == p->info.colId); + taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p); + } + if (pInfo->pRes->pDataBlock == NULL) { // TODO add log pTaskInfo->code = terrno; @@ -5625,8 +5635,18 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* return NULL; } + int32_t numOfOutput = taosArrayGetSize(pColList); + + SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); + for(int32_t i = 0; i < numOfOutput; ++i) { + int16_t* id = taosArrayGet(pColList, i); + taosArrayPush(pColIds, id); + } + + pInfo->pColMatchInfo = pColList; + // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColList); + tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); if (code != 0) { taosMemoryFreeClear(pInfo); @@ -6972,7 +6992,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pInfo->binfo.pRes; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } // STimeWindow win = {0}; @@ -8801,8 +8821,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); - SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo); + + int32_t numOfCols = 0; + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 9ef1c01cd1d6584cb3d2ea074b56aa0095a81ebb..c46f98ef55d2872a8b1576a2389c59a505577960 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -719,6 +719,9 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc if (NULL == pScan->pScanCols) { code = TSDB_CODE_OUT_OF_MEMORY; } + if (TSDB_CODE_SUCCESS == code) { + code = sortScanCols(pScan->pScanCols); + } if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc); } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index fb6c0f6c122ea3412d0773c83a1e02665d252a25..5223aa1e3a5c6c36bb0cc646c91d72912a5b0102 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,6 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { // + blockDebugShowData(pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); //