diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cac8f7304223d10315638485750d5a3a51bfdaea..a65f30f0235e60c4aaf4a2535efe4993afe07c69 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1035,6 +1035,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { return TSDB_CODE_SUCCESS; } +#if 0 typedef struct SHelper { int32_t index; union { @@ -1083,59 +1084,20 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param; - // SSDataBlock* pDataBlock = pHelper->pDataBlock; - SHelper* left = (SHelper*)p1; SHelper* right = (SHelper*)p2; SArray* pInfo = pHelper->orderInfo; int32_t offset = 0; - // for(int32_t i = 0; i < pInfo->size; ++i) { - // SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0); - // SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); - - // if (pColInfoData->hasNull) { - // bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); - // bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); - // if (leftNull && rightNull) { - // continue; // continue to next slot - // } - // - // if (rightNull) { - // return pHelper->nullFirst? 1:-1; - // } - // - // if (leftNull) { - // return pHelper->nullFirst? -1:1; - // } - // } - - // void* left1 = colDataGetData(pColInfoData, left); - // void* right1 = colDataGetData(pColInfoData, right); - - // switch(pColInfoData->info.type) { - // case TSDB_DATA_TYPE_INT: { int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset); int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset); - // offset += pColInfoData->info.bytes; if (leftx == rightx) { - // break; return 0; } else { - // if (pOrder->order == TSDB_ORDER_ASC) { return (leftx < rightx) ? -1 : 1; - // } else { - // return (leftx < rightx)? 1:-1; - // } - } - // } - // default: - // assert(0); - // } - // } - + } return 0; } @@ -1179,6 +1141,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // destroyTupleIndex(index); return 0; } +#endif void blockDataCleanup(SSDataBlock* pDataBlock) { blockDataEmpty(pDataBlock); @@ -1887,6 +1850,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } +#if 0 void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); taosArrayPush(dataBlocks, &pBlock); @@ -1979,6 +1943,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { } } +#endif + // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { int32_t size = 2048; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4eceefcd51781f50314f1d0d5b37e8352b42c05e..cea7216dd5e7937fb29d32a3b2d9bda038895517 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -175,7 +175,7 @@ struct SExecTaskInfo { int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; - STableListInfo* pTableInfoList; // this is a table list +// STableListInfo* pTableInfoList; // this is a table list const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -323,6 +323,8 @@ typedef struct STableScanBase { int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; SLimitInfo limitInfo; + // there are more than one table list exists in one task, if only one vnode exists. + STableListInfo* pTableInfoList; } STableScanBase; typedef struct STableScanInfo { @@ -363,11 +365,12 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; typedef struct STagScanInfo { - SColumnInfo* pCols; - SSDataBlock* pRes; - SColMatchInfo matchInfo; - int32_t curPos; - SReadHandle readHandle; + SColumnInfo* pCols; + SSDataBlock* pRes; + SColMatchInfo matchInfo; + int32_t curPos; + SReadHandle readHandle; + STableListInfo* pTableInfoList; } STagScanInfo; typedef enum EStreamScanMode { @@ -753,9 +756,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // clang-format off SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); @@ -773,7 +776,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); @@ -787,9 +790,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index d42b348fd8c911b3915ad381f1270a290796f5aa..226b3dacd0f456b1ce387d298a5eee8054e52260 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -37,6 +37,7 @@ typedef struct SCacheRowsScanInfo { SSDataBlock* pBufferredRes; SArray* pUidList; int32_t indexOfBufferedRes; + STableListInfo* pTableList; } SCacheRowsScanInfo; static SSDataBlock* doScanCache(SOperatorInfo* pOperator); @@ -47,7 +48,7 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -75,20 +76,18 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } - STableListInfo* pTableList = pTaskInfo->pTableInfoList; - - int32_t totalTables = tableListGetSize(pTableList); + int32_t totalTables = tableListGetSize(pTableListInfo); int32_t capacity = 0; pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname - if (oneTableForEachGroup(pTableList) || (totalTables == 1)) { + if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); - STableKeyInfo* pList = tableListGetInfo(pTableList, 0); + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); - uint64_t suid = tableListGetSuid(pTableList); + uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -136,7 +135,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SCacheRowsScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = pTaskInfo->pTableInfoList; + STableListInfo* pTableList = pInfo->pTableList; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ff11dadf22abbb9d9a342bfb2e1fe27ab25ebac4..c8fce05341f7fc25820f7272bacf4d8ed19a02ce 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableInfoList; taosWLockLatch(&pTaskInfo->lock); for (int32_t i = 0; i < numOfQualifiedTables; ++i) { @@ -1083,7 +1083,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + STableListInfo* pTableListInfo = NULL;//pTaskInfo->pTableInfoList; const char* id = GET_TASKID(pTaskInfo); pTaskInfo->streamInfo.prepareStatus = *pOffset; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8068590dadc201effa2afb9d311ca2aba766e7a5..6e14643e60380d797479ea232ea5a6fb4d022982 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1986,7 +1986,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); pTaskInfo->execModel = model; - pTaskInfo->pTableInfoList = tableListCreate(); pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); @@ -2101,7 +2100,6 @@ bool groupbyTbname(SNodeList* pGroupList) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { int32_t type = nodeType(pPhyNode); - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { @@ -2115,6 +2113,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableScanNode->groupSort = true; } + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -2130,7 +2129,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; return NULL; @@ -2140,6 +2139,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -2155,7 +2155,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; return NULL; @@ -2168,6 +2168,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, @@ -2190,7 +2192,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); + pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); @@ -2199,7 +2201,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2211,6 +2213,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -2231,9 +2234,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); } - pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -2248,7 +2252,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { @@ -2416,6 +2420,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT if (NULL == pDeleterParam) { return TSDB_CODE_OUT_OF_MEMORY; } +#if 0 int32_t tbNum = tableListGetSize(pTask->pTableInfoList); pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList); @@ -2432,6 +2437,8 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT } *pParam = pDeleterParam; +#endif + break; } default: @@ -2481,8 +2488,6 @@ static void freeBlock(void* pParam) { void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - - pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2907c8d056b95afa3b96ae23cc2b25e4349c540b..1fffe14b02459e1aa3eac1e7a1408eecbfbc8900 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } if (pBlock->info.id.uid) { - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = 0;//getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); } uint32_t status = 0; @@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pInfo->currentTable++; taosRLockLatch(&pTaskInfo->lock); - numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + numOfTables = tableListGetSize(pInfo->base.pTableInfoList); if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); @@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); + tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableInfoList, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1); @@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) { setOperatorCompleted(pOperator); return NULL; } int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) { setOperatorCompleted(pOperator); return NULL; } @@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num); tsdbSetTableList(pInfo->base.dataReader, pList, num); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); @@ -866,25 +866,30 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr return 0; } -static void destroyTableScanOperatorInfo(void* param) { - STableScanInfo* pTableScanInfo = (STableScanInfo*)param; - blockDataDestroy(pTableScanInfo->pResBlock); - cleanupQueryTableDataCond(&pTableScanInfo->base.cond); +static void destroyTableScanBase(STableScanBase* pBase) { + cleanupQueryTableDataCond(&pBase->cond); - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; + tsdbReaderClose(pBase->dataReader); + pBase->dataReader = NULL; - if (pTableScanInfo->base.matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); + if (pBase->matchInfo.pList != NULL) { + taosArrayDestroy(pBase->matchInfo.pList); } - taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); - cleanupExprSupp(&pTableScanInfo->base.pseudoSup); + tableListDestroy(pBase->pTableInfoList); + taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache); + cleanupExprSupp(&pBase->pseudoSup); +} + +static void destroyTableScanOperatorInfo(void* param) { + STableScanInfo* pTableScanInfo = (STableScanInfo*)param; + blockDataDestroy(pTableScanInfo->pResBlock); + destroyTableScanBase(&pTableScanInfo->base); taosMemoryFreeClear(param); } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -940,6 +945,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; + pInfo->base.pTableInfoList = pTableListInfo; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { code = terrno; @@ -1059,7 +1065,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU if (hasNext) { /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid); } tsdbReaderClose(pReader); @@ -1080,7 +1086,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid); + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + return getTableGroupId(pTableScanInfo->base.pTableInfoList, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1554,7 +1561,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -2314,6 +2322,7 @@ _end: static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; + if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { destroyOperatorInfo(pStreamScan->pTableScanOp); } @@ -2343,7 +2352,7 @@ static void destroyStreamScanOperatorInfo(void* param) { } SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SArray* pColIds = NULL; SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2411,7 +2420,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } if (pHandle->vnode) { - SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; if (pHandle->version > 0) { pTSInfo->base.cond.endVersion = pHandle->version; @@ -2419,7 +2428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys STableKeyInfo* pList = NULL; int32_t num = 0; - tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num); + tableListGetGroupList(pTableListInfo, 0, &pList, &num); if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; @@ -2450,7 +2459,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); - SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList); + SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableInfoList); code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); @@ -2525,7 +2534,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); + int32_t size = tableListGetSize(pInfo->pTableInfoList); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; @@ -2537,7 +2546,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos); + STableKeyInfo* item = tableListGetInfo(pInfo->pTableInfoList, pInfo->curPos); int32_t code = metaGetTableEntryByUid(&mr, item->uid); tDecoderClear(&mr.coder); if (code != TSDB_CODE_SUCCESS) { @@ -2657,7 +2666,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + void* p = tableListGetInfo(pInfo->base.pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { @@ -2714,7 +2723,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableInfoList, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -2770,10 +2779,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + size_t numOfTables = tableListGetSize(pInfo->base.pTableInfoList); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { - STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableInfoList, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -2907,7 +2916,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList); + size_t tableListSize = tableListGetSize(pInfo->base.pTableInfoList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -2916,7 +2925,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -2941,7 +2950,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); } @@ -2963,9 +2972,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { p->dataReader = NULL; } - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; @@ -2974,20 +2980,14 @@ void destroyTableMergeScanOperatorInfo(void* param) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); taosMemoryFree(pCond->colList); } - taosArrayDestroy(pTableScanInfo->queryConds); - if (pTableScanInfo->base.matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); - } + taosArrayDestroy(pTableScanInfo->queryConds); + destroyTableScanBase(&pTableScanInfo->base); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); - cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - - taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); - taosMemoryFreeClear(param); } @@ -3006,7 +3006,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla } SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3048,6 +3048,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1; + pInfo->base.pTableInfoList = pTableListInfo; pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 68e5cf6ef8f124e5e9757f446601f0782cd04af1..161811531ae10dc657705bfd6fa182ebf626604f 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -83,10 +83,11 @@ typedef struct MergeIndex { } MergeIndex; typedef struct SBlockDistInfo { - SSDataBlock* pResBlock; - STsdbReader* pHandle; - SReadHandle readHandle; - uint64_t uid; // table uid + SSDataBlock* pResBlock; + STsdbReader* pHandle; + SReadHandle readHandle; + STableListInfo* pTableListInfo; + uint64_t uid; // table uid } SBlockDistInfo; static int32_t sysChkFilter__Comm(SNode* pNode); @@ -2245,7 +2246,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC } SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2263,7 +2264,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi goto _error; } - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + pInfo->pTableListInfo = pTableListInfo; size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0);