提交 34e4980a 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 e3bb1021
......@@ -215,8 +215,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
}
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
ASSERT(numOfTables >= 1);
// allocate buffer in order to load data blocks from file
// todo use simple hash instead
SHashObj* pTableMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......@@ -265,23 +263,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
// }
// }
// // only one table, not need to sort again
// static SArray* createCheckInfoFromCheckInfo(STableBlockScanInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
// SArray* pNew = taosArrayInit(1, sizeof(STableBlockScanInfo));
// STableBlockScanInfo info = {.lastKey = skey};
// info.tableId = pCheckInfo->tableId;
// taosArrayPush(pNew, &info);
// return pNew;
// }
// todo
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) {
ASSERT(pWindow != NULL);
bool asc = ASCENDING_TRAVERSE(order);
return false;
// return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey));
return pWindow->skey > pWindow->ekey;
}
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
......@@ -347,8 +331,8 @@ static int32_t initFileIterator(SFilesetIter* pIter, const STsdbFSState* pFState
}
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order);
int32_t step = asc? 1:-1;
bool asc = ASCENDING_TRAVERSE(pIter->order);
int32_t step = asc ? 1 : -1;
pIter->index += step;
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
......@@ -357,26 +341,36 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
// check file the time range of coverage
STimeWindow win = {0};
pReader->status.pCurrentFileset = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
while(1) {
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
// todo file range check
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
// current file are not overlapped with query time window, ignore remain files
// if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.ekey)) {
// tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
// pReader->window.skey, pReader->window.ekey, pReader->idStr);
// return false;
// }
int32_t fid = pReader->status.pCurrentFileset->fid;
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
// current file are no longer overlapped with query time window, ignore remain files
if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
pReader->window.skey, pReader->window.ekey, pReader->idStr);
return false;
}
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
pIter->index += step;
continue;
}
return true;
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid,
pReader->window.skey, pReader->window.ekey, pReader->idStr);
return true;
}
_err:
_err:
return false;
}
......@@ -394,6 +388,29 @@ static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->loadFromFile = true;
}
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
blockDataAppendColInfo(pResBlock, &colInfo);
}
int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pResBlock);
return NULL;
}
return pResBlock;
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
int32_t code = 0;
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
......@@ -427,30 +444,27 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
// todo remove this
setQueryTimewindow(pReader, pCond, 0);
ASSERT (pCond->numOfCols > 0);
if (pCond->numOfCols > 0) {
limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file
pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReader->suppInfo.pstatis == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pReader->pResBlock = createDataBlock();
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
blockDataAppendColInfo(pReader->pResBlock, &colInfo);
}
limitOutputBufferSize(pCond, &pReader->capacity);
blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity);
// allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
if (pSup->pstatis == NULL || pSup->plist == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
setColumnIdSlotList(pReader, pReader->pResBlock);
pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
if (pReader->pResBlock == NULL) {
code = terrno;
goto _end;
}
setColumnIdSlotList(pReader, pReader->pResBlock);
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
......@@ -829,7 +843,6 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
pDumpInfo->allDumped = true;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(order)? 0:pBlock->nRow-1;
pDumpInfo->lastKey = pBlock->maxKey.ts + step;
}
......@@ -847,10 +860,11 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
}
}
// todo consider the output buffer size
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs();
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SSDataBlock* pResBlock = pReader->pResBlock;
......@@ -859,25 +873,29 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int64_t st = taosGetTimestampUs();
SColVal cv = {0};
int32_t colIndex = 0;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
int32_t rowIndex = 0;
int32_t rowIndex = 0;
int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);
int32_t endIndex = 0;
if (remain <= pReader->capacity) {
endIndex = pBlockData->nRow;
} else {
endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
remain = pReader->capacity;
}
int32_t i = 0;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
}
i += 1;
......@@ -890,7 +908,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex);
if (pData->cid == pColData->info.colId) {
for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
tColDataGetValue(pData, j, &cv);
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
}
......@@ -909,15 +927,48 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
i += 1;
}
pResBlock->info.rows = pBlockData->nRow;
setBlockAllDumped(&pReader->status.fBlockDumpInfo, pBlock, pReader->order);
pResBlock->info.rows = remain;
pDumpInfo->rowIndex += step*remain;
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
int64_t elapsedTime = (taosGetTimestampUs() - st);
pReader->cost.blockLoadTime += elapsedTime;
int32_t unDumpedRows = asc? pBlock->nRow - pDumpInfo->rowIndex: pDumpInfo->rowIndex + 1;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow,
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
// todo consider the output buffer size
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs();
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int64_t elapsedTime = (taosGetTimestampUs() - st);
pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
......@@ -2022,29 +2073,6 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
return true;
}
//static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) {
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
//
// while (1) {
// int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
// if (code != TSDB_CODE_SUCCESS || *exists) {
// return code;
// }
//
// if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
// (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
// // all data blocks in current file has been checked already, try next file if exists
// return getFirstFileDataBlock(pTsdbReadHandle, exists);
// } else { // next block of the same file
// cur->slot += step;
// cur->mixBlock = false;
// cur->blockCompleted = false;
// pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// }
// }
//}
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists) {
// pTsdbReadHandle->numOfBlocks = 0;
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
......@@ -2283,52 +2311,65 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
return TSDB_CODE_SUCCESS;
}
}
} else if (pBlockScanInfo->imemHasVal) {
TSDBKEY ik = TSDBROW_KEY(piRow);
if (key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
} else {
if (pBlockScanInfo->imemHasVal) {
TSDBKEY ik = TSDBROW_KEY(piRow);
if (key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == mergeTs) {
if (ik.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (ik.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (ik.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
} else { // pBlockScanInfo->memHasVal != NULL
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == mergeTs) {
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (k.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (k.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// imem & mem are all empty
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
return TSDB_CODE_SUCCESS;
......@@ -2343,8 +2384,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo*
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now
if (pDumpInfo->rowIndex >= pBlock->nRow) {
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break;
}
......@@ -2497,10 +2538,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
tBlockDataInit(&data);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
tBlockDataInit(&pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2736,32 +2775,64 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR
int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc? 1:-1;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + 1] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + 1;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) {
continue;
if (asc) { // todo refactor
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + step;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
continue;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += step;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += 1;
pDumpInfo->rowIndex = rowIndex;
} else {
pDumpInfo->rowIndex += step;
}
} else { // last row of current block, check if current block is overlapped with neighbor block
pDumpInfo->rowIndex += step;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) { // load next block
ASSERT(0);
}
pDumpInfo->rowIndex = rowIndex;
}
} else { // last row of current block, check if current block is overlapped with neighbor block
pDumpInfo->rowIndex += 1;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) {
// load next block
} else {
if (pDumpInfo->rowIndex > 0) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + step;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer ||
pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
continue;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += step;
}
pDumpInfo->rowIndex = rowIndex;
} else {
pDumpInfo->rowIndex += step;
}
} else { // last row of current block, check if current block is overlapped with previous neighbor block
pDumpInfo->rowIndex += step;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) { // load next block
ASSERT(0);
}
}
}
return TSDB_CODE_SUCCESS;
......@@ -2889,23 +2960,11 @@ int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY max
do {
STSRow* pTSRow = NULL;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow);
doAppendOneRow(pBlock, pReader, pTSRow);
if (pBlockScanInfo->memHasVal) {
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iter);
TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts >= maxKey.ts) {
break;
}
if (pTSRow == NULL) {
break;
}
if (pBlockScanInfo->imemHasVal) {
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iiter);
TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts >= maxKey.ts) {
break;
}
}
doAppendOneRow(pBlock, pReader, pTSRow);
// no data in buffer, return immediately
if (!(pBlockScanInfo->memHasVal || pBlockScanInfo->imemHasVal)) {
......@@ -3127,34 +3186,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err;
}
#if 0
// int32_t code = setCurrentSchema(pVnode, pReader);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
// return NULL;
// }
// int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn);
// int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData;
// STSchema* pSchema = pReader->pSchema;
// int32_t i = 0, j = 0;
// while (i < numOfCols && j < pSchema->numOfCols) {
// if (ids[i] == pSchema->columns[j].colId) {
// pReader->suppInfo.slotIds[i] = j;
// i++;
// j++;
// } else if (ids[i] > pSchema->columns[j].colId) {
// j++;
// } else {
// // tsdbReaderClose(pTsdbReadHandle);
// terrno = TSDB_CODE_INVALID_PARA;
// return NULL;
// }
// }
#endif
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
......@@ -3362,24 +3393,25 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
if (pStatus->composedDataBlock) {
return pReader->pResBlock->pDataBlock;
} else {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
}
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return pReader->pResBlock->pDataBlock;
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
return pReader->pResBlock->pDataBlock;
}
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
......
......@@ -285,13 +285,16 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
if (pListInfo->pTableList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint64_t tableUid = pScanNode->uid;
pListInfo->suid = pScanNode->suid;
SNode* pTagCond = (SNode*)pListInfo->pTagCond;
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
......@@ -300,7 +303,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
.metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t));
//code = doFilterTag(pTagIndexCond, &metaArg, res);
// code = doFilterTag(pTagIndexCond, &metaArg, res);
code = TSDB_CODE_INDEX_REBUILDING;
if (code == TSDB_CODE_INDEX_REBUILDING) {
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
......@@ -322,24 +325,27 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
}
if(pTagCond){
if (pTagCond) {
int32_t i = 0;
while(i < taosArrayGetSize(pListInfo->pTableList)) {
while (i < taosArrayGetSize(pListInfo->pTableList)) {
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
bool isOk = isTableOk(info, pTagCond, metaHandle);
if(!isOk){
bool isOk = isTableOk(info, pTagCond, metaHandle);
if (!isOk) {
taosArrayRemove(pListInfo->pTableList, i);
continue;
}
i++;
}
}
}else { // Create one table group.
} else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info);
}
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
if(pListInfo->pGroupList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
//put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册