提交 39a0443e 编写于 作者: H hjxilinx

[td-98] fix bug in descending order query

上级 2ff928db
......@@ -222,8 +222,9 @@ int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
#define TSDB_FGROUP_ITER_FORWARD 0
#define TSDB_FGROUP_ITER_BACKWARD 1
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
typedef struct {
int numOfFGroups;
SFileGroup *base;
......
......@@ -154,7 +154,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
}
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags);
if (ptr == NULL) {
pIter->pFileGroup = NULL;
} else {
......@@ -173,7 +173,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
pIter->pFileGroup += 1;
}
} else {
if (pIter->pFileGroup - 1 == pIter->base) {
if (pIter->pFileGroup == pIter->base) {
pIter->pFileGroup = NULL;
} else {
pIter->pFileGroup -= 1;
......
......@@ -27,7 +27,7 @@
#define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
enum {
QUERY_RANGE_LESS_EQUAL = 0,
......@@ -87,14 +87,8 @@ typedef struct STableBlockInfo {
int32_t groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo;
enum {
SINGLE_TABLE_MODEL = 1,
MULTI_TABLE_MODEL = 2,
};
typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb;
int8_t model; // access model, single table model or multi-table model
SQueryFilePos cur; // current position
SQueryFilePos start; // the start position, used for secondary/third iteration
......@@ -128,21 +122,6 @@ typedef struct STsdbQueryHandle {
SCompIdx* compIndex;
} STsdbQueryHandle;
int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) {
// record the maximum column width among columns of this meter/metric
SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
int32_t maxColWidth = pColumn->info.bytes;
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
int32_t bytes = pColumn[i].info.bytes;
if (bytes > maxColWidth) {
maxColWidth = bytes;
}
}
return TSDB_CODE_SUCCESS;
}
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
pBlockLoadInfo->sid = -1;
......@@ -161,9 +140,9 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order;
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)),
pQueryHandle->loadDataAfterSeek = false;
......@@ -174,25 +153,33 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo));
for (int32_t i = 0; i < size; ++i) {
STableId id = *(STableId*)taosArrayGet(idList, i);
STableId id = *(STableId*) taosArrayGet(idList, i);
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid);
if (pTable == NULL) {
dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid);
continue;
}
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed
.pCompInfo = NULL,
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = pTable,
};
assert(info.pTableObj != NULL);
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
}
pQueryHandle->model = (size > 1) ? MULTI_TABLE_MODEL : SINGLE_TABLE_MODEL;
pQueryHandle->checkFiles = 1;
dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
/*
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
* in case of descending timestamp order query.
*/
pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order);
pQueryHandle->activeIndex = 0;
// malloc buffer in order to load data from file
// allocate buffer in order to load data blocks from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
size_t bufferCapacity = 4096;
......@@ -206,10 +193,6 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
taosArrayPush(pQueryHandle->pColumns, &pDest);
}
if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) {
return NULL;
}
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
......@@ -239,7 +222,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
// todo dynamic get the daysperfile
static int32_t getFileIdFromKey(TSKEY key) {
return (int32_t)(key / 10); // set the starting fileId
int64_t fid = (int64_t)(key / 10); // set the starting fileId
if (fid > INT32_MAX) {
fid = INT32_MAX;
}
return fid;
}
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order);
......@@ -247,8 +235,12 @@ static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks,
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
// todo check open file failed
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) {
fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY);
} else {
assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd));
}
// load all the comp offset value for all tables in this file
......@@ -262,7 +254,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
assert(0);
} else {
if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0);
......@@ -271,46 +263,35 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert(t != NULL);
pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len;
}
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
int32_t index = 0;
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, compIndex->numOfSuperBlocks,
pQueryHandle->order, pCheckInfo->lastKey);
if (type == QUERY_RANGE_GREATER_EQUAL) {
if (pCheckInfo->lastKey <= pCheckInfo->pCompInfo->blocks[start].keyLast) {
// break;
} else {
index = -1;
}
} else {
if (pCheckInfo->lastKey >= pCheckInfo->pCompInfo->blocks[start].keyFirst) {
// break;
} else {
index = -1;
}
}
// not found in data blocks in current file
if (index == -1) {
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
continue;
}
// todo speedup the procedure of locating end block
int32_t e = start;
while (e < compIndex->numOfSuperBlocks &&
(pCheckInfo->pCompInfo->blocks[e].keyFirst <= pQueryHandle->window.ekey)) {
e += 1;
// todo speedup the procedure of located end block
while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
}
pCheckInfo->numOfBlocks = (end - start);
if (start > 0) {
memmove(pCheckInfo->pCompInfo->blocks, &pCheckInfo->pCompInfo->blocks[start], (e - start) * sizeof(SCompBlock));
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
}
pCheckInfo->numOfBlocks = (e - start);
(*numOfBlocks) += pCheckInfo->numOfBlocks;
}
}
......@@ -413,7 +394,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
}
SDataCols* pDataCols = pCheckInfo->pDataCols;
if (pCheckInfo->lastKey > pBlock->keyFirst) {
cur->pos =
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
......@@ -425,8 +405,24 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
} else { // the whole block is loaded in to buffer
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
}
} else { // todo desc query
} else {
// query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pDataCols = pCheckInfo->pDataCols;
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos =
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = pBlock->numOfPoints - 1;
}
filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
}
}
......@@ -619,7 +615,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
}
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
assert(numOfCols <= TSDB_MAX_COLUMNS);
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
......@@ -923,8 +921,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->locateStart = true;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
int32_t numOfBlocks = -1;
......@@ -934,13 +932,14 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
break;
}
assert(numOfBlocks >= 0);
dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
pQueryHandle->pFileGroup->fileId, numOfTables);
numOfTables, pQueryHandle->pFileGroup->fileId);
// todo return error code to query engine
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
......@@ -961,7 +960,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
return false;
}
cur->slot = 0;
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
......@@ -970,8 +969,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
} else {
if (cur->slot == pQueryHandle->numOfBlocks - 1) { // all blocks
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) ||
(cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks
int32_t numOfBlocks = -1;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pQueryHandle->numOfBlocks = 0;
......@@ -1001,67 +1002,84 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur->fid = -1;
return false;
}
cur->slot = 0;
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
} else { // next block of the same file
cur->slot += 1;
cur->pos = 0;
int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1;
cur->slot += step;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
cur->pos = 0;
} else {
cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
}
return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo);
}
}
}
// handle data in cache situation
bool tsdbNextDataBlock(tsdb_query_handle_t* pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo);
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
assert(numOfTables > 0);
if (pHandle->checkFiles) {
if (getDataBlocksInFiles(pHandle)) {
while (pQueryHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pQueryHandle)) {
return true;
}
pHandle->activeIndex = 0;
pHandle->checkFiles = 0;
while (pHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pHandle)) {
pQueryHandle->activeIndex += 1;
}
return false;
}
// handle data in cache situation
bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
if (pQueryHandle->checkFiles) {
if (getDataBlocksInFiles(pQueryHandle)) {
return true;
}
pHandle->activeIndex += 1;
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
}
return false;
} else {
while (pHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pHandle)) {
return doHasDataInBuffer(pQueryHandle);
} else { // starts from the buffer in case of descending timestamp order check data blocks
if (!pQueryHandle->checkFiles) {
if (doHasDataInBuffer(pQueryHandle)) {
return true;
}
pHandle->activeIndex += 1;
pQueryHandle->checkFiles = true;
}
return false;
return getDataBlocksInFiles(pQueryHandle);
}
}
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pHandle) {
STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pHandle->pColumns);
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
*skey = INT64_MIN;
while (tSkipListIterNext(pIter)) {
......@@ -1079,7 +1097,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
offset += pColInfo->info.bytes;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册