diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5cb0dffd46d4fef05619eb47145acd0b5e4a0937..5b1afe3da8f9afc58163a26d4a1d1939f0069998 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -21,7 +21,7 @@ #include "tcompare.h" #include "exception.h" -#include "../../../query/inc/qast.h" // todo move to common module +#include "../../query/inc/qAst.h" // todo move to common module #include "tlosertree.h" #include "tsdb.h" #include "tsdbMain.h" @@ -331,7 +331,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); } else { - tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); + tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, + pHandle->qinfo); } if (!imemEmpty) { @@ -343,7 +344,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); } else { - tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); + tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, + pHandle->qinfo); } return true; @@ -354,7 +356,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { tSkipListDestroyIter(pCheckInfo->iiter); } -SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { +SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) { SDataRow rmem = NULL, rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); @@ -371,20 +373,35 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { } if (rmem != NULL && rimem != NULL) { - if (dataRowKey(rmem) < dataRowKey(rimem)) { - pCheckInfo->chosen = 0; - return rmem; - } else if (dataRowKey(rmem) == dataRowKey(rimem)) { - // data ts are duplicated, ignore the data in mem + TSKEY r1 = dataRowKey(rmem); + TSKEY r2 = dataRowKey(rimem); + + if (r1 == r2) { // data ts are duplicated, ignore the data in mem tSkipListIterNext(pCheckInfo->iter); pCheckInfo->chosen = 1; return rimem; } else { - pCheckInfo->chosen = 1; - return rimem; + if (ASCENDING_TRAVERSE(order)) { + if (r1 < r2) { + pCheckInfo->chosen = 0; + return rmem; + } else { + pCheckInfo->chosen = 1; + return rimem; + } + } else { + if (r1 < r2) { + pCheckInfo->chosen = 1; + return rimem; + } else { + pCheckInfo->chosen = 0; + return rmem; + } + } } } + // at least one (rmem or rimem) is absent here if (rmem != NULL) { pCheckInfo->chosen = 0; return rmem; @@ -398,7 +415,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { return NULL; } -static bool moveToNextRow(STableCheckInfo* pCheckInfo) { +static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { bool hasNext = false; if (pCheckInfo->chosen == 0) { if (pCheckInfo->iter != NULL) { @@ -412,19 +429,17 @@ static bool moveToNextRow(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iiter != NULL) { return tSkipListIterGet(pCheckInfo->iiter) != NULL; } - } else { - if (pCheckInfo->chosen == 1) { - if (pCheckInfo->iiter != NULL) { - hasNext = tSkipListIterNext(pCheckInfo->iiter); - } + } else { //pCheckInfo->chosen == 1 + if (pCheckInfo->iiter != NULL) { + hasNext = tSkipListIterNext(pCheckInfo->iiter); + } - if (hasNext) { - return hasNext; - } + if (hasNext) { + return hasNext; + } - if (pCheckInfo->iter != NULL) { - return tSkipListIterGet(pCheckInfo->iter) != NULL; - } + if (pCheckInfo->iter != NULL) { + return tSkipListIterGet(pCheckInfo->iter) != NULL; } } @@ -445,7 +460,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - SDataRow row = getSDataRowInTableMem(pCheckInfo); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order); if (row == NULL) { return false; } @@ -650,7 +665,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - SDataRow row = getSDataRowInTableMem(pCheckInfo); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); @@ -1033,7 +1048,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); if (row == NULL) { break; } @@ -1061,9 +1076,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->lastKey = key + step; cur->mixBlock = true; - moveToNextRow(pCheckInfo); + moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it - moveToNextRow(pCheckInfo); + moveToNextRowInMem(pCheckInfo); } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -1072,7 +1087,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it - moveToNextRow(pCheckInfo); + moveToNextRowInMem(pCheckInfo); } int32_t start = -1; @@ -1754,7 +1769,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STable* pTable = pCheckInfo->pTableObj; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo); + SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); if (row == NULL) { break; } @@ -1775,11 +1790,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); if (++numOfRows >= maxRowsToRead) { - moveToNextRow(pCheckInfo); + moveToNextRowInMem(pCheckInfo); break; } - } while(moveToNextRow(pCheckInfo)); + } while(moveToNextRowInMem(pCheckInfo)); assert(numOfRows <= maxRowsToRead);