diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 8c106d1067f92d9eabcb3dbf8eb89081a93f015a..0c85c5ef46548df5fd6d1d5866c2ddff411040a7 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -20,6 +20,7 @@ #include "dataformat.h" #include "taosdef.h" #include "tglobalcfg.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -148,6 +149,8 @@ typedef struct { SCompCol cols[]; } SCompData; +STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo); + int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index c45a8407cc1216f17bd74a57ce3e80f31ccf5464..b1a531455dcd15ba4510d4e6e4ada75e66c51f15 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -551,6 +551,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) { return tsdb->tsdbMeta; } +STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo) { + STsdbRepo* tsdb = (STsdbRepo*) pRepo; + return tsdb->tsdbFileH; +} + // Check the configuration and set default options static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // Check precision diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 7b996c8f7a170fdf278c1cb844ca85686f65fa6a..3ed6e22ad8874e27313f95ca4c7f8919a14de66f 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -13,8 +13,9 @@ * along with this program. If not, see . */ -#include #include "os.h" + +#include "tlog.h" #include "tutil.h" #include "../../../query/inc/qast.h" @@ -28,6 +29,11 @@ #define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC) #define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) +enum { + QUERY_RANGE_LESS_EQUAL = 0, + QUERY_RANGE_GREATER_EQUAL = 1, +}; + typedef struct SField { // todo need the definition } SField; @@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo { int32_t fileId; } SHeaderFileInfo; -typedef struct SQueryHandlePos { - int32_t fileId; +typedef struct SQueryFilePos { + int32_t fid; int32_t slot; int32_t pos; - int32_t fileIndex; -} SQueryHandlePos; + int64_t lastKey; +} SQueryFilePos; typedef struct SDataBlockLoadInfo { int32_t fileListIndex; @@ -78,8 +84,12 @@ typedef struct STableCheckInfo { TSKEY lastKey; STable * pTableObj; int64_t offsetInHeaderFile; - int32_t numOfBlocks; +// int32_t numOfBlocks; int32_t start; + bool checkFirstFileBlock; + + SCompIdx* compIndex; + SCompBlock *pBlock; SSkipListIterator* iter; } STableCheckInfo; @@ -104,8 +114,8 @@ enum { typedef struct STsdbQueryHandle { struct STsdbRepo* pTsdb; int8_t model; // access model, single table model or multi-table model - SQueryHandlePos cur; // current position - SQueryHandlePos start; // the start position, used for secondary/third iteration + SQueryFilePos cur; // current position + SQueryFilePos start; // the start position, used for secondary/third iteration int32_t unzipBufSize; char *unzipBuffer; char *secondaryUnzipBuffer; @@ -342,6 +352,344 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) { return true; } +// todo dynamic get the daysperfile +static int32_t getFileIdFromKey(TSKEY key) { + return (int32_t)(key / 10); // set the starting fileId +} + +static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGroup) { + tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables + SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid]; + + if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file + + } else { + tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pBlock); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { + int32_t firstSlot = 0; + int32_t lastSlot = numOfBlocks - 1; + + int32_t midSlot = firstSlot; + + while (1) { + numOfBlocks = lastSlot - firstSlot + 1; + midSlot = (firstSlot + (numOfBlocks >> 1)); + + if (numOfBlocks == 1) break; + + if (skey > pBlock[midSlot].keyLast) { + if (numOfBlocks == 2) break; + if ((order == TSQL_SO_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break; + firstSlot = midSlot + 1; + } else if (skey < pBlock[midSlot].keyFirst) { + if ((order == TSQL_SO_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break; + lastSlot = midSlot - 1; + } else { + break; // got the slot + } + } + + return midSlot; +} + +static SDataBlockInfo getTrueBlockInfo(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { + SDataBlockInfo info = {{0}, 0}; + + SCompBlock *pDiskBlock = &pCheckInfo->pBlock[pHandle->cur.slot]; + + info.window.skey = pDiskBlock->keyFirst; + info.window.ekey = pDiskBlock->keyLast; + info.size = pDiskBlock->numOfPoints; + info.numOfCols = pDiskBlock->numOfCols; + + return info; +} + +bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { + SQueryFilePos *cur = &pQueryHandle->cur; + + if (pQueryHandle->cur.fid >= 0) { + int32_t fileIndex = -1; + + /* + * 1. ascending order. The last data block of data file + * 2. descending order. The first block of file + */ + if ((step == QUERY_ASC_FORWARD_STEP && (pQueryHandle->cur.slot == pQueryHandle->numOfBlocks - 1)) || + (step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) { + // temporarily keep the position value, in case of no data qualified when move forwards(backwards) + SQueryFilePos save = pQueryHandle->cur; + +// fileIndex = getNextDataFileCompInfo_(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step); + + // first data block in the next file + if (fileIndex >= 0) { + cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1; + cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1; +// return loadQaulifiedData(pQueryHandle); + } else {// try data in cache + assert(cur->fid == -1); + + if (step == QUERY_ASC_FORWARD_STEP) { +// TSKEY nextTimestamp = +// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true); +// if (nextTimestamp < 0) { +// pQueryHandle->cur = save; +// } + +// return (nextTimestamp > 0); + } + + // no data to check for desc order query, restore the saved position value + pQueryHandle->cur = save; + return false; + } + } + + // next block in the same file + int32_t fid = cur->fid; +// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo); + cur->slot += step; + + SCompBlock *pBlock = &pQueryHandle->pBlock[cur->slot]; + cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; +// return loadQaulifiedData(pQueryHandle); + } else { // data in cache +// todo continue; + } +} + + +int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) { + int firstPos, lastPos, midPos = -1; + int numOfPoints; + TSKEY *keyList; + + if (num <= 0) return -1; + + keyList = (TSKEY *)pValue; + firstPos = 0; + lastPos = num - 1; + + if (order == 0) { + // find the first position which is smaller than the key + while (1) { + if (key >= keyList[lastPos]) return lastPos; + if (key == keyList[firstPos]) return firstPos; + if (key < keyList[firstPos]) return firstPos - 1; + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + + } else { + // find the first position which is bigger than the key + while (1) { + if (key <= keyList[firstPos]) return firstPos; + if (key == keyList[lastPos]) return lastPos; + + if (key > keyList[lastPos]) { + lastPos = lastPos + 1; + if (lastPos >= num) + return -1; + else + return lastPos; + } + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + } + + return midPos; +} + +static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) { + // only return the qualified data to client in terms of query time window, data rows in the same block but do not + // be included in the query time window will be discarded + SQueryFilePos *cur = &pQueryHandle->cur; + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + SDataBlockInfo blockInfo = getTrueBlockInfo(pQueryHandle, pCheckInfo); + + int32_t endPos = cur->pos; + if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { + endPos = blockInfo.size - 1; + pQueryHandle->realNumOfRows = endPos - cur->pos + 1; + } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { + endPos = 0; + pQueryHandle->realNumOfRows = cur->pos + 1; + } else { +// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size, pQueryHandle->window.ekey, pQueryHandle->order); + + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (endPos < cur->pos) { + pQueryHandle->realNumOfRows = 0; + return; + } else { + pQueryHandle->realNumOfRows = endPos - cur->pos; + } + } else { + if (endPos > cur->pos) { + pQueryHandle->realNumOfRows = 0; + return; + } else { + pQueryHandle->realNumOfRows = cur->pos - endPos; + } + } + } + + int32_t start = MIN(cur->pos, endPos); + + // move the data block in the front to data block if needed + if (start != 0) { + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + + for (int32_t i = 0; i < taosArrayGetSize(sa); ++i) { + int16_t colId = *(int16_t *)taosArrayGet(sa, i); + + for (int32_t j = 0; j < numOfCols; ++j) { + SColumnInfoEx *pCol = taosArrayGet(pQueryHandle->pColumns, j); + + if (pCol->info.colId == colId) { + memmove(pCol->pData, ((char *)pCol->pData) + pCol->info.bytes * start, pQueryHandle->realNumOfRows * pCol->info.bytes); + break; + } + } + } + } + + + + assert(pQueryHandle->realNumOfRows <= blockInfo.size); + + // forward(backward) the position for cursor + cur->pos = endPos; +} + +static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) { + STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); + int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); + + SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); + pCheckInfo->checkFirstFileBlock = true; + + SQueryFilePos* cur = &pQueryHandle->cur; + + TSKEY key = pCheckInfo->lastKey; + int32_t index = -1; + + // todo add iterator for filegroup + while (1) { + if ((fid = getFileCompInfo(pCheckInfo, fileGroup)) < 0) { + break; + } + + int32_t tid = pCheckInfo->tableId.tid; + index = binarySearchForBlockImpl(pCheckInfo->pBlock, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key); + + if (type == QUERY_RANGE_GREATER_EQUAL) { + if (key <= pCheckInfo->pBlock[index].keyLast) { + break; + } else { + index = -1; + } + } else { + if (key >= pCheckInfo->pBlock[index].keyFirst) { + break; + } else { + index = -1; + } + } + } + + // failed to find qualified point in file, abort + if (index == -1) { + return false; + } + + assert(index >= 0 && index < pQueryHandle->numOfBlocks); + + // load first data block into memory failed, caused by disk block error + bool blockLoaded = false; + SArray *sa = NULL; + + // todo no need to loaded at all + cur->slot = index; + +// sa = getDefaultLoadColumns(pQueryHandle, true); + if (tsdbLoadDataBlock(&fileGroup->files[2], &pCheckInfo->pBlock[cur->slot], 1, fid, sa) == 0) { + blockLoaded = true; + } + + // dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files", + // GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx); + + // failed to load data from disk, abort current query + if (blockLoaded == false) { + return false; + } + + // todo search qualified points in blk, according to primary key (timestamp) column +// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order); + assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); + + filterDataInDataBlock(pQueryHandle, sa); + return pQueryHandle->realNumOfRows > 0; +} + +static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { + assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); + STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); + SQueryFilePos* cur = &pHandle->cur; + + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + + if (!pCheckInfo->checkFirstFileBlock && pFileHandle != NULL) { + int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); + SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); + pCheckInfo->checkFirstFileBlock = true; + + if (fileGroup != NULL) { + return getQualifiedDataBlock(pHandle, pCheckInfo, 1); + } else { // no data in file, try cache + return hasMoreDataInCacheForSingleModel(pHandle); + } + } else { + pCheckInfo->checkFirstFileBlock = true; + if (pFileHandle == NULL) { + cur->fid = -1; + } + + if (cur->fid == -1 || pFileHandle != NULL) { // try data in cache + return hasMoreDataInCacheForSingleModel(pHandle); + } else { + return true; + } + } + +} + static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo); assert(numOfTables > 0); @@ -372,7 +720,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; if (pHandle->model == SINGLE_TABLE_MODEL) { - return hasMoreDataInCacheForSingleModel(pHandle); + return hasMoreDataInFileForSingleTableModel(pHandle); } else { return hasMoreDataInCacheForMultiModel(pHandle); } @@ -704,8 +1052,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); - char buf[TSDB_MAX_TAGS_LEN] = {0}; - char* val = dataRowTuple(pTable->tagVal); // todo not only the first column int8_t type = pInfo->sch.type; @@ -765,9 +1111,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond // query according to the binary expression SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)}; - SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, + SBinaryFilterSupp supp = { + .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare, - .pExtInfo = &s}; + .pExtInfo = &s + }; tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp); tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo); diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 6fcedb8123cb2ea8e6bb6b89a3b7d45c96bcd3f6..b621781a3c81e4155841ac5d4ec31aae6a38518e 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -46,7 +46,8 @@ int main(int argc, char *argv[]) { } printf("success to connect to server\n"); - int32_t code = taos_query(taos, "select * from test.t1"); +// int32_t code = taos_query(taos, "insert into test.tm2 values(now, 1)(now+1m,2)(now+2m,3) (now+3m, 4) (now+4m, 5);"); + int32_t code = taos_query(taos, "insert into test.tm2 values(now, 99)"); if (code != 0) { printf("failed to execute query, reason:%s\n", taos_errstr(taos)); }