提交 3a566d5e 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 7678b6bb
...@@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
} }
} else { } else {
SColVal cv = {0}; SColVal cv = {0};
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1);
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
for (int32_t j = 0; j < pBlockData->nRow; ++j) { for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv); tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
...@@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
} }
pReader->pResBlock->info.rows = pBlockData->nRow; pReader->pResBlock->info.rows = pBlockData->nRow;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
/* /*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
...@@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) ...@@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
} }
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
} }
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
...@@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code; return code;
} }
static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
while(1) { while(1) {
...@@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { ...@@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
} }
} }
static int32_t loadDataInFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter; SFileSetIter* pFIter = &pStatus->fileIter;
...@@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { ...@@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
// // check if the query range overlaps with the file data block // // check if the query range overlaps with the file data block
// bool exists = true; // bool exists = true;
// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists); // int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists);
// if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// pTsdbReadHandle->checkFiles = false; // pTsdbReadHandle->checkFiles = false;
// return false; // return false;
...@@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
int32_t code = loadDataInFiles(pReader); int32_t code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
...@@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pBlock->info.rows > 0) { if (pBlock->info.rows > 0) {
return true; return true;
} else { } else {
buildInmemBlockSeqentially(pReader); buildBlockFromBufferSeqentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
buildInmemBlockSeqentially(pReader); buildBlockFromBufferSeqentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
...@@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
// if (pReader->checkFiles) { // if (pReader->checkFiles) {
// // check if the query range overlaps with the file data block // // check if the query range overlaps with the file data block
// bool exists = true; // bool exists = true;
// int32_t code = loadDataInFiles(pReader, &exists); // int32_t code = buildBlockFromFiles(pReader, &exists);
// if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// pReader->activeIndex = 0; // pReader->activeIndex = 0;
// pReader->checkFiles = false; // pReader->checkFiles = false;
...@@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { ...@@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t code = tBlockDataInit(&pStatus->fileBlockData); int32_t code = tBlockDataInit(&pStatus->fileBlockData);
doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
......
...@@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
SArray* createSortInfo(SNodeList* pNodeList); SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
......
...@@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ ...@@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
return result; return result;
} }
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY; if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
...@@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
//code = doFilterTag(pTagIndexCond, &metaArg, res); //code = doFilterTag(pTagIndexCond, &metaArg, res);
code = TSDB_CODE_INDEX_REBUILDING; code = TSDB_CODE_INDEX_REBUILDING;
if (code == TSDB_CODE_INDEX_REBUILDING) { if (code == TSDB_CODE_INDEX_REBUILDING) {
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
} else if (code != TSDB_CODE_SUCCESS) { } else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
taosArrayDestroy(res); taosArrayDestroy(res);
...@@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
taosArrayDestroy(res); taosArrayDestroy(res);
} else { } else {
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
} }
if(pTagCond){ if(pTagCond){
......
...@@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pBlockNode->tableType == TSDB_SUPER_TABLE) { if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
} }
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) { STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo { ...@@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo {
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0; return 0;
} }
// TODO consider the page meta size
int32_t getProperSortPageSize(size_t rowSize) { int32_t getProperSortPageSize(size_t rowSize) {
uint32_t defaultPageSize = 4096; uint32_t defaultPageSize = 4096;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册