提交 9413a9ae 编写于 作者: R root

Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query

...@@ -87,12 +87,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); ...@@ -87,12 +87,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
//#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id)))
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
if (id < pResultBuf->inMemPages) { if (id < pResultBuf->inMemPages) {
return pResultBuf->iBuf + id * pResultBuf->pageSize; return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
} else { } else {
return pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize; return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize);
} }
} }
/** /**
......
...@@ -10,7 +10,7 @@ namespace { ...@@ -10,7 +10,7 @@ namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, NULL); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0; int32_t pageId = 0;
int32_t groupId = 0; int32_t groupId = 0;
...@@ -22,8 +22,7 @@ void simpleTest() { ...@@ -22,8 +22,7 @@ void simpleTest() {
ASSERT_EQ(getResBufSize(pResultBuf), 1000*16384L); ASSERT_EQ(getResBufSize(pResultBuf), 1000*16384L);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId); SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(list.size, 1); ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
destroyResultBuf(pResultBuf, NULL); destroyResultBuf(pResultBuf, NULL);
......
...@@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); ...@@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......
...@@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo); ...@@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
...@@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK ...@@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
return 0; return 0;
} }
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
......
...@@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle { ...@@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle {
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbQueryHandle* pQueryHandle); STsdbQueryHandle* pQueryHandle);
...@@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
} }
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
/* /*
* no data in cache, only load data from file * no data in cache, only load data from file
...@@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = false; cur->mixBlock = false;
cur->blockCompleted = true; cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey;
} }
} }
...@@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0; cur->pos = 0;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
...@@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
...@@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap ...@@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step; pQueryHandle->cur.lastKey = tsArray[end] + step;
return numOfRows + num; return numOfRows + num;
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { int32_t numOfCols, STable* pTable) {
char* pData = NULL; char* pData = NULL;
// the schema version info is embeded in SDataRow // the schema version info is embeded in SDataRow
...@@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
// only return the qualified data to client in terms of query time window, data rows in the same block but do not // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded // be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
...@@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos; int32_t endPos = cur->pos;
...@@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
STimeWindow win = TSWINDOW_INITIALIZER;
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
pQueryHandle->pFileGroup = NULL;
break;
}
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break; break;
} }
...@@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->skey = TSKEY_INITIAL_VAL; win->skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
do { do {
...@@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
win->ekey = key; win->ekey = key;
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
......
...@@ -35,12 +35,12 @@ extern "C" { ...@@ -35,12 +35,12 @@ extern "C" {
#define WCHAR wchar_t #define WCHAR wchar_t
#define tfree(x) \ #define tfree(x) \
{ \ do { \
if (x) { \ if (x) { \
free((void *)(x)); \ free((void *)(x)); \
x = 0; \ x = 0; \
} \ } \
} } while(0);
#define tstrncpy(dst, src, size) \ #define tstrncpy(dst, src, size) \
do { \ do { \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册