diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ecc3ca4aa327512cba5a87ed52ce1ff44c306374..9e15f3b12d3a17e9877dac147773821148867b87 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -375,30 +375,34 @@ typedef struct SessionWindowSupporter { } SessionWindowSupporter; typedef struct SStreamBlockScanInfo { - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - SSDataBlock* pUpdateRes; // update SSDataBlock - int32_t updateResIndex; - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - SColumnInfo* pCols; // the output column info - uint64_t numOfExec; // execution times - void* streamBlockReader;// stream block reader handle - SArray* pColMatchInfo; // - SNode* pCondition; - SArray* tsArray; - SUpdateInfo* pUpdateInfo; - int32_t primaryTsIndex; // primary time stamp slot id - void* pDataReader; - SReadHandle readHandle; - uint64_t tableUid; // queried super table uid + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock + int32_t updateResIndex; + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + SColumnInfo* pCols; // the output column info + uint64_t numOfExec; // execution times + void* streamBlockReader;// stream block reader handle + SArray* pColMatchInfo; // + SNode* pCondition; + SArray* tsArray; + SUpdateInfo* pUpdateInfo; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + + int32_t primaryTsIndex; // primary time stamp slot id + void* pDataReader; + SReadHandle readHandle; + uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; - SOperatorInfo* pOperatorDumy; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SOperatorInfo* pOperatorDumy; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SCatchSupporter childAggSup; - SArray* childIds; + SArray* childIds; SessionWindowSupporter sessionSup; - bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. + bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -737,10 +741,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, - uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, - SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition, - SOperatorInfo* pOperatorDumy); + +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2208c94e9c0aecc377a5502ebe9f639ea28a2562..3e52ff1836ad55132b63c6d1f9ba020a4751b4cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4493,8 +4493,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t numOfCols = 0; - tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); @@ -4503,24 +4501,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } if (pDataReader == NULL && terrno != 0) { - qDebug("pDataReader is NULL"); + qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo)); // return NULL; } else { - qDebug("pDataReader is not NULL"); + qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } - SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - SArray* tableIdList = extractTableIdList(pTableListInfo); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo); - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SArray* pCols = - extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); - - SOperatorInfo* pOperator = - createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, - tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { @@ -4915,8 +4904,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return pList; } -int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, - STableListInfo* pListInfo, SNode* pTagCond) { +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond) { int32_t code = TSDB_CODE_SUCCESS; pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bbdb3b2b7e00ca32da52a7b7b794abb63a5e64b8..fe7668f3705f53d986f028cd024dfa06c15e0f48 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock); +static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { @@ -250,7 +250,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { - addTagPseudoColumnData(pTableScanInfo, pBlock); + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock); } int64_t st = taosGetTimestampMs(); @@ -287,18 +287,18 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction compareTimeWindow); } -void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { +void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) { // currently only the tbname pseudo column - if (pTableScanInfo->numOfPseudoExpr == 0) { + if (numOfPseudoExpr == 0) { return; } SMetaReader mr = {0}; - metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0); + metaReaderInit(&mr, pHandle->meta, 0); metaGetTableEntryByUid(&mr, pBlock->info.uid); - for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) { - SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j]; + for (int32_t j = 0; j < numOfPseudoExpr; ++j) { + SExprInfo* pExpr = &pPseudoExpr[j]; int32_t dstSlotId = pExpr->base.resSchema.slotId; @@ -309,7 +309,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) // this is to handle the tbname if (fmIsScanPseudoColumnFunc(functionId)) { - setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); + setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId); } else { // these are tags const char* p = NULL; if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { @@ -910,8 +910,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->pRes->info.groupId = groupId; } - int32_t numOfCols = pInfo->pRes->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); if (!pColMatchInfo->output) { continue; @@ -941,10 +940,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pTaskInfo->code = terrno; return NULL; } + rows = pBlockInfo->rows; + + // currently only the tbname pseudo column + if (pInfo->numOfPseudoExpr > 0) { + addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); + } + doFilter(pInfo->pCondition, pInfo->pRes, NULL); blockDataUpdateTsWindow(pInfo->pRes, 0); - break; } @@ -972,10 +977,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, - uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, - SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition, - SOperatorInfo* pOperatorDumy) { +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -983,22 +985,28 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR goto _error; } - STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info; + SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; - int32_t numOfOutput = taosArrayGetSize(pColList); + SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); + STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; + + int32_t numOfCols = 0; + pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + + int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo); + SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - SColMatchInfo* id = taosArrayGet(pColList, i); - int16_t colId = id->colId; + SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i); + + int16_t colId = id->colId; taosArrayPush(pColIds, &colId); } - pInfo->pColMatchInfo = pColList; - // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); - int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); + tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); + int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList); if (code != 0) { goto _error; } @@ -1021,27 +1029,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->pUpdateInfo = NULL; } - pInfo->readHandle = *pHandle; - pInfo->tableUid = uid; - pInfo->streamBlockReader = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->pDataReader = pDataReader; - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->pOperatorDumy = pOperatorDumy; - pInfo->interval = pSTInfo->interval; - pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; - - initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", - "/tmp/"); // TODO(liuyao) get row size from phy plan + // create the pseduo columns info + if (pTableScanNode->scan.pScanPseudoCols != NULL) { + pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + } - pOperator->name = "StreamBlockScanOperator"; + pInfo->readHandle = *pHandle; + pInfo->tableUid = pScanPhyNode->uid; + pInfo->streamBlockReader = pHandle->reader; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pCondition = pScanPhyNode->node.pConditions; + pInfo->pDataReader = pDataReader; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + pInfo->pOperatorDumy = pTableScanDummy; + pInfo->interval = pSTInfo->interval; + pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; + + // TODO(liuyao) get row size from phy plan + initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", TD_TMP_DIR_PATH); + + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pInfo->pRes->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);