提交 addabf3b 编写于 作者: H Haojun Liao

refactor: push down the tableListinfo to scanner.

上级 23e8edd1
......@@ -1035,6 +1035,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return TSDB_CODE_SUCCESS;
}
#if 0
typedef struct SHelper {
int32_t index;
union {
......@@ -1083,59 +1084,20 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
// SSDataBlock* pDataBlock = pHelper->pDataBlock;
SHelper* left = (SHelper*)p1;
SHelper* right = (SHelper*)p2;
SArray* pInfo = pHelper->orderInfo;
int32_t offset = 0;
// for(int32_t i = 0; i < pInfo->size; ++i) {
// SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0);
// SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
// if (pColInfoData->hasNull) {
// bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg);
// bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg);
// if (leftNull && rightNull) {
// continue; // continue to next slot
// }
//
// if (rightNull) {
// return pHelper->nullFirst? 1:-1;
// }
//
// if (leftNull) {
// return pHelper->nullFirst? -1:1;
// }
// }
// void* left1 = colDataGetData(pColInfoData, left);
// void* right1 = colDataGetData(pColInfoData, right);
// switch(pColInfoData->info.type) {
// case TSDB_DATA_TYPE_INT: {
int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset);
int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset);
// offset += pColInfoData->info.bytes;
if (leftx == rightx) {
// break;
return 0;
} else {
// if (pOrder->order == TSDB_ORDER_ASC) {
return (leftx < rightx) ? -1 : 1;
// } else {
// return (leftx < rightx)? 1:-1;
// }
}
// }
// default:
// assert(0);
// }
// }
}
return 0;
}
......@@ -1179,6 +1141,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// destroyTupleIndex(index);
return 0;
}
#endif
void blockDataCleanup(SSDataBlock* pDataBlock) {
blockDataEmpty(pDataBlock);
......@@ -1887,6 +1850,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
#if 0
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, &pBlock);
......@@ -1979,6 +1943,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
}
}
#endif
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
......
......@@ -175,7 +175,7 @@ struct SExecTaskInfo {
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo* pTableInfoList; // this is a table list
// STableListInfo* pTableInfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -323,6 +323,8 @@ typedef struct STableScanBase {
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableInfoList;
} STableScanBase;
typedef struct STableScanInfo {
......@@ -363,11 +365,12 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableInfoList;
} STagScanInfo;
typedef enum EStreamScanMode {
......@@ -753,9 +756,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// clang-format off
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
......@@ -773,7 +776,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
......@@ -787,9 +790,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
......
......@@ -37,6 +37,7 @@ typedef struct SCacheRowsScanInfo {
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
STableListInfo* pTableList;
} SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
......@@ -47,7 +48,7 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -75,20 +76,18 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error;
}
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
int32_t totalTables = tableListGetSize(pTableList);
int32_t totalTables = tableListGetSize(pTableListInfo);
int32_t capacity = 0;
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname
if (oneTableForEachGroup(pTableList) || (totalTables == 1)) {
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
STableKeyInfo* pList = tableListGetInfo(pTableList, 0);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
uint64_t suid = tableListGetSuid(pTableList);
uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
......@@ -136,7 +135,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
......
......@@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableInfoList;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
......@@ -1083,7 +1083,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
STableListInfo* pTableListInfo = NULL;//pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
pTaskInfo->streamInfo.prepareStatus = *pOffset;
......
......@@ -1986,7 +1986,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
......@@ -2101,7 +2100,6 @@ bool groupbyTbname(SNodeList* pGroupList) {
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser) {
int32_t type = nodeType(pPhyNode);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
......@@ -2115,6 +2113,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableScanNode->groupSort = true;
}
STableListInfo* pTableListInfo = tableListCreate();
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
......@@ -2130,7 +2129,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
return NULL;
......@@ -2140,6 +2139,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
......@@ -2155,7 +2155,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
return NULL;
......@@ -2168,6 +2168,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pHandle->vnode) {
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
......@@ -2190,7 +2192,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
......@@ -2199,7 +2201,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2211,6 +2213,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
......@@ -2231,9 +2234,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
}
pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo);
......@@ -2248,7 +2252,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
......@@ -2416,6 +2420,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
#if 0
int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);
......@@ -2432,6 +2437,8 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
*pParam = pDeleterParam;
#endif
break;
}
default:
......@@ -2481,8 +2488,6 @@ static void freeBlock(void* pParam) {
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
......
......@@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = 0;//getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
uint32_t status = 0;
......@@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
......@@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableInfoList, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
......@@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
} else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
setOperatorCompleted(pOperator);
return NULL;
}
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
......@@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
setOperatorCompleted(pOperator);
return NULL;
}
......@@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
tsdbSetTableList(pInfo->base.dataReader, pList, num);
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
......@@ -866,25 +866,30 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
return 0;
}
static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
static void destroyTableScanBase(STableScanBase* pBase) {
cleanupQueryTableDataCond(&pBase->cond);
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
tsdbReaderClose(pBase->dataReader);
pBase->dataReader = NULL;
if (pTableScanInfo->base.matchInfo.pList != NULL) {
taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
if (pBase->matchInfo.pList != NULL) {
taosArrayDestroy(pBase->matchInfo.pList);
}
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
tableListDestroy(pBase->pTableInfoList);
taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
cleanupExprSupp(&pBase->pseudoSup);
}
static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
destroyTableScanBase(&pTableScanInfo->base);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -940,6 +945,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->base.pTableInfoList = pTableListInfo;
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
......@@ -1059,7 +1065,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
if (hasNext) {
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
}
tsdbReaderClose(pReader);
......@@ -1080,7 +1086,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
return getTableGroupId(pTableScanInfo->base.pTableInfoList, uid);
}
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
......@@ -1554,7 +1561,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version;
pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
......@@ -2314,6 +2322,7 @@ _end:
static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
destroyOperatorInfo(pStreamScan->pTableScanOp);
}
......@@ -2343,7 +2352,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SArray* pColIds = NULL;
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -2411,7 +2420,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
if (pHandle->vnode) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) {
pTSInfo->base.cond.endVersion = pHandle->version;
......@@ -2419,7 +2428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
STableKeyInfo* pList = NULL;
int32_t num = 0;
tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
tableListGetGroupList(pTableListInfo, 0, &pList, &num);
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
......@@ -2450,7 +2459,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableInfoList);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
......@@ -2525,7 +2534,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
int32_t size = tableListGetSize(pInfo->pTableInfoList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
......@@ -2537,7 +2546,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
STableKeyInfo* item = tableListGetInfo(pInfo->pTableInfoList, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2657,7 +2666,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pInfo->base.pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
......@@ -2714,7 +2723,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue;
}
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableInfoList, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -2770,10 +2779,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{
size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
size_t numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableInfoList, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
......@@ -2907,7 +2916,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
size_t tableListSize = tableListGetSize(pInfo->base.pTableInfoList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
......@@ -2916,7 +2925,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
......@@ -2941,7 +2950,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
......@@ -2963,9 +2972,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
p->dataReader = NULL;
}
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
......@@ -2974,20 +2980,14 @@ void destroyTableMergeScanOperatorInfo(void* param) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
if (pTableScanInfo->base.matchInfo.pList != NULL) {
taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
destroyTableScanBase(&pTableScanInfo->base);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
taosMemoryFreeClear(param);
}
......@@ -3006,7 +3006,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -3048,6 +3048,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->base.limitInfo.limit.limit = -1;
pInfo->base.limitInfo.slimit.limit = -1;
pInfo->base.pTableInfoList = pTableListInfo;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
......
......@@ -83,10 +83,11 @@ typedef struct MergeIndex {
} MergeIndex;
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
uint64_t uid; // table uid
} SBlockDistInfo;
static int32_t sysChkFilter__Comm(SNode* pNode);
......@@ -2245,7 +2246,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -2263,7 +2264,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
pInfo->pTableListInfo = pTableListInfo;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册