diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 41fccd177dc7b384b199028a60b17ffafa061cfc..d76659cc919c89eeabb6dd8c091edd660555b285 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -87,12 +87,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); * @param id * @return */ -//#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id))) static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { if (id < pResultBuf->inMemPages) { - return pResultBuf->iBuf + id * pResultBuf->pageSize; + return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize); } else { - return pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize; + return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize); } } /** diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp index f822deeb7351db64a59c4b4e7b493cdd951f4efc..63ed89ab9fe1de2222c6f5ea8ae05b331437f76d 100644 --- a/src/query/tests/resultBufferTest.cpp +++ b/src/query/tests/resultBufferTest.cpp @@ -10,7 +10,7 @@ namespace { // simple test void simpleTest() { SDiskbasedResultBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, NULL); + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL); int32_t pageId = 0; int32_t groupId = 0; @@ -22,8 +22,7 @@ void simpleTest() { ASSERT_EQ(getResBufSize(pResultBuf), 1000*16384L); SIDList list = getDataBufPagesIdList(pResultBuf, groupId); - ASSERT_EQ(list.size, 1); - + ASSERT_EQ(taosArrayGetSize(list), 1); ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); destroyResultBuf(pResultBuf, NULL); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 40f2dac660fe0d7995015bfeb8b8b9303bd80086..210d95853cf216e93dd09feb4f8e03e15b269471 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index b29cec3cf9cb11a2059165599ee730f07347d155..1b7db635b337e2576ac61ac100dabad91be793df 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); -static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); @@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK return 0; } -static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5b1afe3da8f9afc58163a26d4a1d1939f0069998..37784577c498da13f15e2489c5c1aecc38106ad6 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle { static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, - SArray* sa); +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle); @@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* } doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { /* * no data in cache, only load data from file @@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* cur->mixBlock = false; cur->blockCompleted = true; cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); + pCheckInfo->lastKey = cur->lastKey; } } @@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = 0; } - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } @@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = pBlock->numOfRows - 1; } - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } @@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step; - + return numOfRows + num; } static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, - STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { + int32_t numOfCols, STable* pTable) { char* pData = NULL; // the schema version info is embeded in SDataRow @@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, - SArray* sa) { +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); @@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); STable* pTable = pCheckInfo->pTableObj; int32_t endPos = cur->pos; @@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex int32_t numOfBlocks = 0; int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - + + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; + STimeWindow win = TSWINDOW_INITIALIZER; + while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey); + + // current file are not overlapped with query time window, ignore remain files + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { + tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo) + pQueryHandle->pFileGroup = NULL; + break; + } + if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } @@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int win->skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); STable* pTable = pCheckInfo->pTableObj; do { @@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } win->ekey = key; - copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); + copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable); if (++numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index f7c69e3973ad6fdefd4ef33f4a813221eedebe6d..34f35c380745172f5fc6a6d4aaf094cdf82c68ac 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -35,12 +35,12 @@ extern "C" { #define WCHAR wchar_t #define tfree(x) \ - { \ + do { \ if (x) { \ free((void *)(x)); \ x = 0; \ } \ - } + } while(0); #define tstrncpy(dst, src, size) \ do { \