提交 0dd0b168 编写于 作者: D dapan1121

enh: optimize table scan for last query

上级 cacbc7d3
...@@ -230,6 +230,7 @@ typedef enum EFuncDataRequired { ...@@ -230,6 +230,7 @@ typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_SMA_LOAD, FUNC_DATA_REQUIRED_SMA_LOAD,
FUNC_DATA_REQUIRED_NOT_LOAD, FUNC_DATA_REQUIRED_NOT_LOAD,
FUNC_DATA_REQUIRED_FILTEROUT, FUNC_DATA_REQUIRED_FILTEROUT,
FUNC_DATA_REQUIRED_ALL_FILTEROUT,
} EFuncDataRequired; } EFuncDataRequired;
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
......
...@@ -183,7 +183,7 @@ typedef struct STsdbReader STsdbReader; ...@@ -183,7 +183,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
......
...@@ -193,6 +193,7 @@ struct STsdbReader { ...@@ -193,6 +193,7 @@ struct STsdbReader {
SBlockInfoBuf blockInfoBuf; SBlockInfoBuf blockInfoBuf;
int32_t step; int32_t step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
SHashObj** pIgnoreTables;
}; };
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
...@@ -2704,15 +2705,21 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2704,15 +2705,21 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
double el = 0; double el = 0;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
if (*pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
return code;
}
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
goto _end; goto _end;
} }
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly // it is a clean block, load it directly
...@@ -2731,9 +2738,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2731,9 +2738,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} else { // file blocks not exist } else { // file blocks not exist
pBlockScanInfo = *pReader->status.pTableIter; pBlockScanInfo = *pReader->status.pTableIter;
if (*pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
return code;
}
} }
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
while (1) { while (1) {
...@@ -3009,6 +3019,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -3009,6 +3019,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (*pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasVal) { if (!hasVal) {
...@@ -3060,20 +3078,24 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -3060,20 +3078,24 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SDataBlk* pBlock = NULL;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
STableBlockScanInfo* pScanInfo = NULL; STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
if (*pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
return code;
}
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
return terrno; return terrno;
} }
pBlock = getCurrentBlock(pBlockIter);
initLastBlockReader(pLastBlockReader, pScanInfo, pReader); initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
...@@ -3309,6 +3331,13 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { ...@@ -3309,6 +3331,13 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
// } // }
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
if (*pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
}
initMemDataIterator(*pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
...@@ -4257,7 +4286,7 @@ static void freeSchemaFunc(void* param) { ...@@ -4257,7 +4286,7 @@ static void freeSchemaFunc(void* param) {
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) { SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
int32_t capacity = pVnode->config.tsdbCfg.maxRows; int32_t capacity = pVnode->config.tsdbCfg.maxRows;
...@@ -4357,6 +4386,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -4357,6 +4386,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->readMode = READ_MODE_COUNT_ONLY; pReader->readMode = READ_MODE_COUNT_ONLY;
} }
pReader->pIgnoreTables = pIgnoreTables;
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
......
...@@ -89,7 +89,25 @@ typedef struct SColMatchInfo { ...@@ -89,7 +89,25 @@ typedef struct SColMatchInfo {
} SColMatchInfo; } SColMatchInfo;
typedef struct SExecTaskInfo SExecTaskInfo; typedef struct SExecTaskInfo SExecTaskInfo;
typedef struct STableListInfo STableListInfo;
typedef struct STableListIdInfo {
uint64_t suid;
uint64_t uid;
int32_t tableType;
} STableListIdInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
STableListIdInfo idInfo; // this maybe the super table or ordinary table
} STableListInfo;
struct SqlFunctionCtx; struct SqlFunctionCtx;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
......
...@@ -327,6 +327,7 @@ typedef struct STableScanInfo { ...@@ -327,6 +327,7 @@ typedef struct STableScanInfo {
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SHashObj* pIgnoreTables;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
......
...@@ -27,23 +27,6 @@ ...@@ -27,23 +27,6 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "tcompression.h" #include "tcompression.h"
typedef struct STableListIdInfo {
uint64_t suid;
uint64_t uid;
int32_t tableType;
} STableListIdInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
STableListIdInfo idInfo; // this maybe the super table or ordinary table
};
typedef struct tagFilterAssist { typedef struct tagFilterAssist {
SHashObj* colHash; SHashObj* colHash;
int32_t index; int32_t index;
......
...@@ -1170,7 +1170,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1170,7 +1170,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pScanBaseInfo->dataReader == NULL) { if (pScanBaseInfo->dataReader == NULL) {
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false); pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code; terrno = code;
...@@ -1229,7 +1229,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1229,7 +1229,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t size = tableListGetSize(pTableListInfo); int32_t size = tableListGetSize(pTableListInfo);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL,
false); false, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
...@@ -192,8 +192,23 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -192,8 +192,23 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
return (SResultRow*)((char*)(*pPage) + p1->offset); return (SResultRow*)((char*)(*pPage) + p1->offset);
} }
static int32_t insertTableToScanIgnoreList(STableScanInfo* pTableScanInfo, uint64_t uid) {
if (NULL == pTableScanInfo->pIgnoreTables) {
int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
pTableScanInfo->pIgnoreTables = taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (NULL == pTableScanInfo->pIgnoreTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosHashPut(pTableScanInfo->pIgnoreTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, sizeof(pTableScanInfo->scanTimes));
return TSDB_CODE_SUCCESS;
}
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
if (pTableScanInfo->base.pdInfo.pExprSup == NULL) { if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -226,9 +241,10 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* ...@@ -226,9 +241,10 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
if (notLoadBlock) { if (notLoadBlock) {
*status = FUNC_DATA_REQUIRED_NOT_LOAD; *status = FUNC_DATA_REQUIRED_NOT_LOAD;
code = insertTableToScanIgnoreList(pTableScanInfo, pBlockInfo->id.uid);
} }
return TSDB_CODE_SUCCESS; return code;
} }
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
...@@ -380,7 +396,13 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -380,7 +396,13 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader); tsdbReleaseDataBlock(pTableScanInfo->dataReader);
STableScanInfo* pTableScanInfo = pOperator->info;
if (taosHashGetSize(pTableScanInfo->pIgnoreTables) == taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList)) {
*status = FUNC_DATA_REQUIRED_ALL_FILTEROUT;
} else {
*status = FUNC_DATA_REQUIRED_FILTEROUT; *status = FUNC_DATA_REQUIRED_FILTEROUT;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -695,6 +717,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -695,6 +717,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
break;
}
// current block is filter out according to filter condition, continue load the next block // current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue; continue;
...@@ -734,6 +760,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { ...@@ -734,6 +760,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
} }
pTableScanInfo->scanTimes += 1; pTableScanInfo->scanTimes += 1;
taosHashClear(pTableScanInfo->pIgnoreTables);
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
...@@ -761,6 +788,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { ...@@ -761,6 +788,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
} }
pTableScanInfo->scanTimes += 1; pTableScanInfo->scanTimes += 1;
taosHashClear(pTableScanInfo->pIgnoreTables);
if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->scanTimes < total) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
...@@ -825,7 +853,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -825,7 +853,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly); (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -894,6 +922,7 @@ static void destroyTableScanBase(STableScanBase* pBase) { ...@@ -894,6 +922,7 @@ static void destroyTableScanBase(STableScanBase* pBase) {
static void destroyTableScanOperatorInfo(void* param) { static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param; STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock); blockDataDestroy(pTableScanInfo->pResBlock);
taosHashCleanup(pTableScanInfo->pIgnoreTables);
destroyTableScanBase(&pTableScanInfo->base); destroyTableScanBase(&pTableScanInfo->base);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -1059,7 +1088,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -1059,7 +1088,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false); (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
...@@ -2662,7 +2691,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2662,7 +2691,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) { if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false); code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -2710,6 +2739,10 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2710,6 +2739,10 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
break;
}
// current block is filter out according to filter condition, continue load the next block // current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue; continue;
......
...@@ -2269,7 +2269,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -2269,7 +2269,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo); size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false); code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false, NULL);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
if (code != 0) { if (code != 0) {
goto _error; goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册