diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7e10e7c6d6d963d844585ccfd6a0cc3b8d7fd55a..462d540c5152abcdc986da019a2bcefd1a0e6146 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -737,6 +737,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end); static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols); static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle); +static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; @@ -746,10 +747,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); + assert(cur->pos >= 0 && cur->pos <= binfo.rows); + TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo); - if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { @@ -787,7 +789,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc * * Here the buffer is not enough, so only part of file block can be loaded into memory buffer */ - assert(pQueryHandle->outputCapacity >= binfo.rows); + assert(pQueryHandle->outputCapacity >= binfo.rows); if ((cur->pos == 0 && ASCENDING_TRAVERSE(pQueryHandle->order)) || (cur->pos == (binfo.rows - 1) && (!ASCENDING_TRAVERSE(pQueryHandle->order)))) { @@ -797,45 +799,15 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc cur->win = binfo.window; cur->mixBlock = false; cur->lastKey = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.window.ekey + 1): (binfo.window.skey -1); - } else { - - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; - TSKEY* tsArray = pCols->cols[0].pData; - - assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && - cur->pos >= 0 && cur->pos < pBlock->numOfRows && - (tsArray[0] == binfo.window.skey && tsArray[binfo.rows - 1] == binfo.window.ekey)); - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - cur->rows = binfo.rows - cur->pos; - - cur->win.skey = tsArray[cur->pos]; - cur->win.ekey = binfo.window.ekey; - - cur->lastKey = binfo.window.ekey + 1; - int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, cur->pos, binfo.rows-1); - assert(numOfRows == cur->rows); - } else { - cur->rows = cur->pos + 1; - - cur->win.skey = binfo.window.skey; - cur->win.ekey = tsArray[cur->pos]; - - cur->lastKey = binfo.window.skey - 1; - int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, 0, cur->pos); - assert(numOfRows == cur->rows); - - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); - moveDataToFront(pQueryHandle, numOfRows, numOfCols); - doCheckGeneratedBlockRange(pQueryHandle); - } - - pQueryHandle->realNumOfRows = cur->rows; + } else { // partially copy to dest buffer + int32_t endPos = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.rows - 1): 0; + copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos); cur->mixBlock = true; } - cur->blockCompleted = true; - pCheckInfo->lastKey = cur->lastKey; + assert(cur->blockCompleted); + tsdbDebug("create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %p", + cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle); } return code; @@ -866,7 +838,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl assert(pCheckInfo->lastKey <= pBlock->keyLast); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer - cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1); + cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows - 1); code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } } else { //desc order, query ended in current block @@ -957,7 +929,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { +int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { char* pData = NULL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; @@ -1182,6 +1154,49 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) { } } +static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { + SQueryFilePos* cur = &pQueryHandle->cur; + + SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + TSKEY* tsArray = pCols->cols[0].pData; + + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + + int32_t pos = cur->pos; + + int32_t start = cur->pos; + int32_t end = endPos; + + if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(start >= end); + SWAP(start, end, int32_t); + } + + int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, start, end); + + // the time window should always be ascending order: skey <= ekey + cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; + cur->mixBlock = (start == 0 && end == pBlockInfo->rows - 1); + cur->lastKey = tsArray[endPos] + step; + + pos += endPos + step; + + cur->blockCompleted = + (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); + + // if the buffer is not full in case of descending order query, move the data in the front of the buffer + moveDataToFront(pQueryHandle, numOfRows, numOfCols); + updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); + doCheckGeneratedBlockRange(pQueryHandle); + + tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", + pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey, + cur->win.ekey, cur->rows, pQueryHandle->qinfo); +} + + // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { @@ -1224,37 +1239,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; + int32_t pos = cur->pos; cur->win = TSWINDOW_INITIALIZER; // no data in buffer, load data from file directly if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { - int32_t start = cur->pos; - int32_t end = endPos; - - if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { - SWAP(start, end, int32_t); - } - - numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); - - // the time window should always be right order: skey <= ekey - cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; - cur->lastKey = tsArray[endPos]; - pos += (end - start + 1) * step; - - cur->blockCompleted = - (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); - - // if the buffer is not full in case of descending order query, move the data in the front of the buffer - moveDataToFront(pQueryHandle, numOfRows, numOfCols); - updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); - doCheckGeneratedBlockRange(pQueryHandle); - - tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", - pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey, - cur->win.ekey, cur->rows, pQueryHandle->qinfo); + copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &blockInfo, endPos); return; } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; @@ -1703,7 +1694,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists); } } else { - tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo); + tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo); int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); *exists = pQueryHandle->realNumOfRows > 0;