diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c84b55f11081fda7ea97bb6e8a06cd84ad50b2ab..f716bb01baf0df6c016d660eee947aea50cc0093 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -51,6 +51,7 @@ typedef struct SQueryFilePos { int64_t lastKey; int32_t rows; bool mixBlock; + bool blockCompleted; STimeWindow win; } SQueryFilePos; @@ -187,7 +188,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place * in case of descending timestamp order query. */ - pQueryHandle->checkFiles = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order); + pQueryHandle->checkFiles = true;//ASCENDING_ORDER_TRAVERSE(pQueryHandle->order); pQueryHandle->activeIndex = 0; // allocate buffer in order to load data blocks from file @@ -229,7 +230,7 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* return pQueryHandle; } -static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { +static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { STable* pTable = pCheckInfo->pTableObj; assert(pTable != NULL); @@ -254,7 +255,7 @@ static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh if (pTable->imem) { pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey, - TSDB_DATA_TYPE_TIMESTAMP, order); + TSDB_DATA_TYPE_TIMESTAMP, order); } // both iterators are NULL, no data in buffer right now @@ -539,10 +540,51 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { // the whole block is loaded in to buffer - pQueryHandle->realNumOfRows = pBlock->numOfPoints; - cur->pos = 0; + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); + /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); + + TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; + if (pCheckInfo->iter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + SDataRow row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + + if (k1 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iter)) { + node = tSkipListIterGet(pCheckInfo->iter); + row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + } else { + k1 = TSKEY_INITIAL_VAL; + } + } + } + + if (pCheckInfo->iiter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); + SDataRow row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + + if (k2 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iiter)) { + node = tSkipListIterGet(pCheckInfo->iiter); + row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + } else { + k2 = TSKEY_INITIAL_VAL; + } + } + } + + if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) { + doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); + mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + } else { + pQueryHandle->realNumOfRows = binfo.rows; + cur->pos = 0; + } } - } else { + } else { //desc order // query ended in current block if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { @@ -557,13 +599,53 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = pBlock->numOfPoints - 1; } - mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { - pQueryHandle->realNumOfRows = pBlock->numOfPoints; - cur->pos = pBlock->numOfPoints - 1; + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); + /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); + + TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; + if (pCheckInfo->iter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + SDataRow row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + + if (k1 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iter)) { + node = tSkipListIterGet(pCheckInfo->iter); + row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + } else { + k1 = TSKEY_INITIAL_VAL; + } + } + } + + if (pCheckInfo->iiter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); + SDataRow row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + + if (k2 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iiter)) { + node = tSkipListIterGet(pCheckInfo->iiter); + row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + } else { + k2 = TSKEY_INITIAL_VAL; + } + } + } + + cur->pos = binfo.rows - 1; + if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) { + doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); + mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + } + } +// pQueryHandle->realNumOfRows = pBlock->numOfPoints; +// cur->pos = pBlock->numOfPoints - 1; } - } taosArrayDestroy(sa); return pQueryHandle->realNumOfRows > 0; @@ -632,17 +714,14 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { } static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo, int32_t capacity, - int32_t numOfRows, int32_t* pos, int32_t endPos) { + int32_t numOfRows, int32_t start, int32_t end) { char* pData = NULL; + int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1 : -1; + SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; TSKEY* tsArray = pCols->cols[0].pData; - int32_t numOfCols = pCols->numOfCols; - - int32_t n = (*pos); // todo: the output buffer limitation and the query time window? - while(n < pBlockInfo->rows && n <= endPos && ((n - (*pos) + numOfRows) < capacity)) { n++;} - - int32_t num = n - (*pos); + int32_t num = end - start + 1; int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); //data in buffer has greater timestamp, copy data in file block @@ -653,21 +732,21 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockI if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { - pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; } - for (int32_t j = 0; j < numOfCols; ++j) { // todo opt performance + for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance SDataCol* src = &pCols->cols[j]; if (pColInfo->info.colId == src->colId) { if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { - memmove(pData, src->pData + bytes * (*pos), bytes * num); + memmove(pData, src->pData + bytes * start, bytes * num); } else { // handle the var-string char* dst = pData; // todo refactor, only copy one-by-one - for (int32_t k = (*pos); k < num + (*pos); ++k) { + for (int32_t k = start; k < num + start; ++k) { char* p = tdGetColDataOfRow(src, k); memcpy(dst, p, varDataTLen(p)); dst += bytes; @@ -679,13 +758,10 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockI } } - *pos += num; - numOfRows += num; - - pQueryHandle->cur.win.ekey = tsArray[(*pos) - 1]; - pQueryHandle->cur.lastKey = pQueryHandle->cur.win.ekey + 1; // todo ??? + pQueryHandle->cur.win.ekey = tsArray[end]; + pQueryHandle->cur.lastKey = tsArray[end] + step; - return numOfRows; + return numOfRows + num; } static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, @@ -729,103 +805,212 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); - initSkipListIterator(pQueryHandle, pCheckInfo); + initTableMemIterator(pQueryHandle, pCheckInfo); SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; int32_t endPos = cur->pos; if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.rows - 1; + cur->mixBlock = (cur->pos != 0); } else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { endPos = 0; + cur->mixBlock = (cur->pos != blockInfo.rows - 1); } else { int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); - -// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { -// if (endPos < cur->pos) { -// pQueryHandle->realNumOfRows = 0; -// return; -// } else { -// pQueryHandle->realNumOfRows = endPos - cur->pos + 1; -// } -// -// pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1; -// } else { -// if (endPos > cur->pos) { -// pQueryHandle->realNumOfRows = 0; -// return; -// } else { -// pQueryHandle->realNumOfRows = cur->pos - endPos + 1; -// } -// } + cur->mixBlock = true; } // compared with the data from in-memory buffer, to generate the correct timestamp array list - int32_t pos = MIN(cur->pos, endPos); + int32_t pos = cur->pos; assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0); TSKEY* tsArray = pCols->cols[0].pData; int32_t numOfRows = 0; pQueryHandle->cur.win = TSWINDOW_INITIALIZER; + int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1; // no data in buffer, load data from file directly if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { - cur->win.skey = tsArray[pos]; - copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); + int32_t start = cur->pos; + int32_t end = endPos; + if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + end = cur->pos; + start = endPos; + } + + cur->win.skey = tsArray[start]; + cur->win.ekey = tsArray[end]; + + // todo opt in case of no data in buffer + numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end); + + // if the buffer is not full in case of descending order query, move the data in the front of the buffer + if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) { + int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; + int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); + + for(int32_t i = 0; i < reqNumOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + } + } + + pCheckInfo->lastKey = cur->lastKey; + pQueryHandle->realNumOfRows = numOfRows; + cur->rows = numOfRows; return; } else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { // } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) { // } else { // iter and iiter are all not NULL, three-way merge data block STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); - - while (1) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + SSkipListNode* node = NULL; + + do { + node = tSkipListIterGet(pCheckInfo->iter); if (node == NULL) { - if (cur->win.skey == TSKEY_INITIAL_VAL) { - cur->win.skey = tsArray[pos]; - } - - numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); break; } - + SDataRow row = SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); - - if (key < tsArray[pos]) { + if ((key > pQueryHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (key < pQueryHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { + break; + } + + if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + ((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { + break; + } + + if ((key < tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (key > tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); numOfRows += 1; - cur->mixBlock = true; - if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } - + cur->win.ekey = key; + cur->lastKey = key + step; + cur->mixBlock = true; + tSkipListIterNext(pCheckInfo->iter); - - if (numOfRows >= pQueryHandle->outputCapacity) { - break; - } - } else if (key == tsArray[pos]) { //data in buffer has the same timestamp of data in file block, ignore it + } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it tSkipListIterNext(pCheckInfo->iter); - } else if (key > tsArray[pos]) { + } else if ((key > tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (key < tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; } + + int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); + + int32_t start = -1; + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + int32_t remain = end - pos + 1; + if (remain + numOfRows > pQueryHandle->outputCapacity) { + end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1; + } + + start = pos; + } else { + int32_t remain = (pos - end) + 1; + if (remain + numOfRows > pQueryHandle->outputCapacity) { + end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows); + } + + start = end; + end = pos; + } + + numOfRows = + copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end); + pos += (end - start + 1) * step; + } + } while (numOfRows < pQueryHandle->outputCapacity); + + if (numOfRows < pQueryHandle->outputCapacity) { + if (node == NULL || + ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { + // no data in cache or data in cache is greater than the ekey of time window, load data from file block + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = tsArray[pos]; + } + + int32_t start = -1; + int32_t end = -1; + + // all remain data are qualified, but check the remain capacity in the first place. + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + int32_t remain = endPos - pos + 1; + if (remain + numOfRows > pQueryHandle->outputCapacity) { + endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1; + } + + start = pos; + end = endPos; + } else { + int32_t remain = pos + 1; + if (remain + numOfRows > pQueryHandle->outputCapacity) { + endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows); + } + + start = endPos; + end = pos; + } + + numOfRows = + copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end); + pos += (end - start + 1) * step; + } else { - numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); - - if (numOfRows >= pQueryHandle->outputCapacity || - pQueryHandle->cur.lastKey >= blockInfo.window.ekey || - pQueryHandle->cur.lastKey > pQueryHandle->window.ekey) { - break; + while(numOfRows < pQueryHandle->outputCapacity && node != NULL && + (((dataRowKey(SL_GET_NODE_DATA(node)) <= pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + ((dataRowKey(SL_GET_NODE_DATA(node)) >= pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)))) { + SDataRow row = SL_GET_NODE_DATA(node); + TSKEY key = dataRowKey(row); + + copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); + numOfRows += 1; + + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = key; + } + + cur->win.ekey = key; + cur->lastKey = key + step; + cur->mixBlock = true; + + tSkipListIterNext(pCheckInfo->iter); + node = tSkipListIterGet(pCheckInfo->iter); } } } } + + cur->blockCompleted = ((pos >= endPos && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (pos <= endPos && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))); + if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + SWAP(cur->win.skey, cur->win.ekey, TSKEY); + + // if the buffer is not full in case of descending order query, move the data in the front of the buffer + if (numOfRows < pQueryHandle->outputCapacity) { + int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; + + int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); + for(int32_t i = 0; i < reqiredNumOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + } + } + } + pCheckInfo->lastKey = cur->lastKey; pQueryHandle->realNumOfRows = numOfRows; cur->rows = numOfRows; @@ -1103,7 +1288,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; // current block is done, try next - if (!cur->mixBlock || cur->pos >= pBlockInfo->pBlock.compBlock->numOfPoints) { + if (!cur->mixBlock || cur->blockCompleted) { if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all data blocks in current file has been checked already, try next file if exists @@ -1111,14 +1296,17 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { } else { // next block of the same file int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1; cur->slot += step; - + + cur->mixBlock = false; + cur->blockCompleted = false; + STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); } } else { SArray* sa = getDefaultLoadColumns(pQueryHandle, true); mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); - return pQueryHandle->pColumns; + return true; } } } @@ -1145,28 +1333,44 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); - if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { - if (pQueryHandle->checkFiles) { - if (getDataBlocksInFiles(pQueryHandle)) { - return true; - } - - pQueryHandle->activeIndex = 0; - pQueryHandle->checkFiles = false; + if (pQueryHandle->checkFiles) { + if (getDataBlocksInFiles(pQueryHandle)) { + return true; } - + + pQueryHandle->activeIndex = 0; + pQueryHandle->checkFiles = false; + } + + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { return doHasDataInBuffer(pQueryHandle); - } else { // starts from the buffer in case of descending timestamp order check data blocks - if (!pQueryHandle->checkFiles) { - if (doHasDataInBuffer(pQueryHandle)) { - return true; - } - - pQueryHandle->checkFiles = true; - } - - return getDataBlocksInFiles(pQueryHandle); + } else { +// assert(0); + return false; } + +// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { +// if (pQueryHandle->checkFiles) { +// if (getDataBlocksInFiles(pQueryHandle)) { +// return true; +// } +// +// pQueryHandle->activeIndex = 0; +// pQueryHandle->checkFiles = false; +// } +// +// return doHasDataInBuffer(pQueryHandle); +// } else { // starts from the buffer in case of descending timestamp order check data blocks +// if (!pQueryHandle->checkFiles) { +// if (doHasDataInBuffer(pQueryHandle)) { +// return true; +// } +// +// pQueryHandle->checkFiles = true; +// } +// +// return getDataBlocksInFiles(pQueryHandle); +// } } void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { @@ -1321,51 +1525,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; pTable = pCheckInfo->pTableObj; - - SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock); - /*bool hasData = */initSkipListIterator(pHandle, pCheckInfo); - - TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; - if (pCheckInfo->iter != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - SDataRow row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - - if (k1 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iter)) { - node = tSkipListIterGet(pCheckInfo->iter); - row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - } else { - k1 = TSKEY_INITIAL_VAL; - } - } - } - - if (pCheckInfo->iiter != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - SDataRow row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - - if (k2 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iiter)) { - node = tSkipListIterGet(pCheckInfo->iiter); - row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - } else { - k2 = TSKEY_INITIAL_VAL; - } - } - } - - assert(0); - if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) { - doLoadFileDataBlock(pHandle, pBlockInfo->pBlock.compBlock, pCheckInfo); - SArray* sa = getDefaultLoadColumns(pHandle, true); - mergeDataInDataBlock(pHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); - taosArrayDestroy(sa); - + if (pHandle->cur.mixBlock) { SDataBlockInfo blockInfo = { .uid = pTable->tableId.uid, .tid = pTable->tableId.tid, @@ -1375,26 +1536,30 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { return blockInfo; } else { + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock); + return binfo; + } +// else { /* - * no data in mem or imem, or data in mem|imem with greater timestamp, no need to load data in buffer + * no data in mem or imem, or data in mem/imem with greater timestamp, no need to load data in buffer * return the file block info directly */ - if (!pHandle->cur.mixBlock && pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) { - pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step; - assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints); - - return binfo; - } else { - SDataBlockInfo blockInfo = { - .uid = pTable->tableId.uid, - .tid = pTable->tableId.tid, - .rows = pHandle->cur.rows, - .window = pHandle->cur.win, - }; - - return blockInfo; - } - } +// if (!pHandle->cur.mixBlock || pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) { +// pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step; +// assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints); +// +// return binfo; +// } else { +// SDataBlockInfo blockInfo = { +// .uid = pTable->tableId.uid, +// .tid = pTable->tableId.tid, +// .rows = pHandle->cur.rows, +// .window = pHandle->cur.win, +// }; +// +// return blockInfo; +// } +// } } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); pTable = pCheckInfo->pTableObj;