diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 06f62ea6f728620f01b70a16794b566d6eae273b..077bdf45c3522870c952aa25643b889f13c982f9 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -222,8 +222,9 @@ int tsdbOpenFile(SFile *pFile, int oflag); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); -#define TSDB_FGROUP_ITER_FORWARD 0 -#define TSDB_FGROUP_ITER_BACKWARD 1 +#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC +#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC + typedef struct { int numOfFGroups; SFileGroup *base; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 8bdfe63002e392476c9ddb27395e5181d324f180..d025144ba93ce239529a1c872065945d62c6ea7a 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -154,7 +154,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { } int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); + void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags); if (ptr == NULL) { pIter->pFileGroup = NULL; } else { @@ -173,7 +173,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { pIter->pFileGroup += 1; } } else { - if (pIter->pFileGroup - 1 == pIter->base) { + if (pIter->pFileGroup == pIter->base) { pIter->pFileGroup = NULL; } else { pIter->pFileGroup -= 1; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index e5f0aed05ee9e7a6d855f4d3812fe1ead868f184..2fce73e547a93508944c3859108b4631a7ccc15f 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -27,7 +27,7 @@ #define EXTRA_BYTES 2 #define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) #define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC) -#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) +#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns))) enum { QUERY_RANGE_LESS_EQUAL = 0, @@ -87,14 +87,8 @@ typedef struct STableBlockInfo { int32_t groupIdx; /* number of group is less than the total number of tables */ } STableBlockInfo; -enum { - SINGLE_TABLE_MODEL = 1, - MULTI_TABLE_MODEL = 2, -}; - typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; - int8_t model; // access model, single table model or multi-table model SQueryFilePos cur; // current position SQueryFilePos start; // the start position, used for secondary/third iteration @@ -128,21 +122,6 @@ typedef struct STsdbQueryHandle { SCompIdx* compIndex; } STsdbQueryHandle; -int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) { - // record the maximum column width among columns of this meter/metric - SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0); - - int32_t maxColWidth = pColumn->info.bytes; - for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) { - int32_t bytes = pColumn[i].info.bytes; - if (bytes > maxColWidth) { - maxColWidth = bytes; - } - } - - return TSDB_CODE_SUCCESS; -} - static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; pBlockLoadInfo->sid = -1; @@ -161,9 +140,9 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond // todo 2. add the reference count for each table that is involved in query STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); - pQueryHandle->order = pCond->order; + pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; - pQueryHandle->pTsdb = tsdb; + pQueryHandle->pTsdb = tsdb; pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)), pQueryHandle->loadDataAfterSeek = false; @@ -174,25 +153,33 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); for (int32_t i = 0; i < size; ++i) { - STableId id = *(STableId*)taosArrayGet(idList, i); - + STableId id = *(STableId*) taosArrayGet(idList, i); + + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid); + if (pTable == NULL) { + dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid); + continue; + } + STableCheckInfo info = { - .lastKey = pQueryHandle->window.skey, - .tableId = id, - .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed - .pCompInfo = NULL, + .lastKey = pQueryHandle->window.skey, + .tableId = id, + .pTableObj = pTable, }; - assert(info.pTableObj != NULL); taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } - pQueryHandle->model = (size > 1) ? MULTI_TABLE_MODEL : SINGLE_TABLE_MODEL; - pQueryHandle->checkFiles = 1; - + dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); + + /* + * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place + * in case of descending timestamp order query. + */ + pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order); pQueryHandle->activeIndex = 0; - // malloc buffer in order to load data from file + // allocate buffer in order to load data blocks from file int32_t numOfCols = taosArrayGetSize(pColumnInfo); size_t bufferCapacity = 4096; @@ -206,10 +193,6 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond taosArrayPush(pQueryHandle->pColumns, &pDest); } - if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) { - return NULL; - } - tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); @@ -239,7 +222,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { // todo dynamic get the daysperfile static int32_t getFileIdFromKey(TSKEY key) { - return (int32_t)(key / 10); // set the starting fileId + int64_t fid = (int64_t)(key / 10); // set the starting fileId + if (fid > INT32_MAX) { + fid = INT32_MAX; + } + + return fid; } static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order); @@ -247,8 +235,12 @@ static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { // todo check open file failed SFileGroup* fileGroup = pQueryHandle->pFileGroup; + + assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) { fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY); + } else { + assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd)); } // load all the comp offset value for all tables in this file @@ -262,7 +254,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file - + assert(0); } else { if (pCheckInfo->compSize < compIndex->len) { assert(compIndex->len > 0); @@ -271,46 +263,35 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo assert(t != NULL); pCheckInfo->pCompInfo = (SCompInfo*) t; + pCheckInfo->compSize = compIndex->len; } tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); - - int32_t index = 0; + + SCompInfo* pCompInfo = pCheckInfo->pCompInfo; + + TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); + TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); + // discard the unqualified data block based on the query time window - int32_t start = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, compIndex->numOfSuperBlocks, - pQueryHandle->order, pCheckInfo->lastKey); - - if (type == QUERY_RANGE_GREATER_EQUAL) { - if (pCheckInfo->lastKey <= pCheckInfo->pCompInfo->blocks[start].keyLast) { - // break; - } else { - index = -1; - } - } else { - if (pCheckInfo->lastKey >= pCheckInfo->pCompInfo->blocks[start].keyFirst) { - // break; - } else { - index = -1; - } - } - - // not found in data blocks in current file - if (index == -1) { + int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC); + int32_t end = start; + + if (s > pCompInfo->blocks[start].keyLast) { continue; } - // todo speedup the procedure of locating end block - int32_t e = start; - while (e < compIndex->numOfSuperBlocks && - (pCheckInfo->pCompInfo->blocks[e].keyFirst <= pQueryHandle->window.ekey)) { - e += 1; + // todo speedup the procedure of located end block + while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { + end += 1; } + pCheckInfo->numOfBlocks = (end - start); + if (start > 0) { - memmove(pCheckInfo->pCompInfo->blocks, &pCheckInfo->pCompInfo->blocks[start], (e - start) * sizeof(SCompBlock)); + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); } - pCheckInfo->numOfBlocks = (e - start); (*numOfBlocks) += pCheckInfo->numOfBlocks; } } @@ -413,7 +394,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } SDataCols* pDataCols = pCheckInfo->pDataCols; - if (pCheckInfo->lastKey > pBlock->keyFirst) { cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); @@ -425,8 +405,24 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } else { // the whole block is loaded in to buffer pQueryHandle->realNumOfRows = pBlock->numOfPoints; } - } else { // todo desc query + } else { + // query ended in current block if (pQueryHandle->window.ekey > pBlock->keyFirst) { + if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { + return false; + } + + SDataCols* pDataCols = pCheckInfo->pDataCols; + if (pCheckInfo->lastKey < pBlock->keyLast) { + cur->pos = + binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); + } else { + cur->pos = pBlock->numOfPoints - 1; + } + + filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + } else { + pQueryHandle->realNumOfRows = pBlock->numOfPoints; } } @@ -619,7 +615,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf } static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) { - int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + assert(numOfCols <= TSDB_MAX_COLUMNS); + SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t)); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); @@ -923,8 +921,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { pQueryHandle->locateStart = true; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey); - - tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD); + + tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); int32_t numOfBlocks = -1; @@ -934,13 +932,14 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { - if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) { + int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; + if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { break; } assert(numOfBlocks >= 0); dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks, - pQueryHandle->pFileGroup->fileId, numOfTables); + numOfTables, pQueryHandle->pFileGroup->fileId); // todo return error code to query engine if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) { @@ -961,7 +960,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { return false; } - cur->slot = 0; + cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; @@ -970,8 +969,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); } else { - if (cur->slot == pQueryHandle->numOfBlocks - 1) { // all blocks + if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) || + (cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks int32_t numOfBlocks = -1; + int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); pQueryHandle->numOfBlocks = 0; @@ -1001,67 +1002,84 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { cur->fid = -1; return false; } - - cur->slot = 0; + + cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; SCompBlock* pBlock = pBlockInfo->pBlock.compBlock; return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); } else { // next block of the same file - cur->slot += 1; - cur->pos = 0; - + int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1; + cur->slot += step; + STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + cur->pos = 0; + } else { + cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1; + } + return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); } } } -// handle data in cache situation -bool tsdbNextDataBlock(tsdb_query_handle_t* pQueryHandle) { - STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; - - size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo); + +static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + // todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity assert(numOfTables > 0); - if (pHandle->checkFiles) { - if (getDataBlocksInFiles(pHandle)) { + while (pQueryHandle->activeIndex < numOfTables) { + if (hasMoreDataInCache(pQueryHandle)) { return true; } - pHandle->activeIndex = 0; - pHandle->checkFiles = 0; - - while (pHandle->activeIndex < numOfTables) { - if (hasMoreDataInCache(pHandle)) { + pQueryHandle->activeIndex += 1; + } + + return false; +} + +// handle data in cache situation +bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; + + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + assert(numOfTables > 0); + + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (pQueryHandle->checkFiles) { + if (getDataBlocksInFiles(pQueryHandle)) { return true; } - - pHandle->activeIndex += 1; + + pQueryHandle->activeIndex = 0; + pQueryHandle->checkFiles = false; } - return false; - } else { - while (pHandle->activeIndex < numOfTables) { - if (hasMoreDataInCache(pHandle)) { + return doHasDataInBuffer(pQueryHandle); + } else { // starts from the buffer in case of descending timestamp order check data blocks + if (!pQueryHandle->checkFiles) { + if (doHasDataInBuffer(pQueryHandle)) { return true; } - pHandle->activeIndex += 1; + pQueryHandle->checkFiles = true; } - - return false; + + return getDataBlocksInFiles(pQueryHandle); } + } static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, - STsdbQueryHandle* pHandle) { + STsdbQueryHandle* pQueryHandle) { int numOfRows = 0; - int32_t numOfCols = taosArrayGetSize(pHandle->pColumns); + int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); *skey = INT64_MIN; while (tSkipListIterNext(pIter)) { @@ -1079,7 +1097,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; }