diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 71d3fd9cc40091f78e3e082992de0422a5aadc67..1d158609122206dc87188d627185bdcfdef2c382 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -198,38 +198,38 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab // allocate buffer in order to load data blocks from file int32_t numOfCols = pCond->numOfCols; - + pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; - + colInfo.info = pCond->colList[i]; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); taosArrayPush(pQueryHandle->pColumns, &colInfo); pQueryHandle->statis[i].colId = colInfo.info.colId; } - + pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); - + for (int32_t i = 0; i < sizeOfGroup; ++i) { SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); - + size_t gsize = taosArrayGetSize(group); assert(gsize > 0); - + for (int32_t j = 0; j < gsize; ++j) { STable* pTable = (STable*) taosArrayGetP(group, j); - + STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, .tableId = pTable->tableId, .pTableObj = pTable, }; - + assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); @@ -259,17 +259,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { assert(pHandle != NULL); - + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle; - + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); SArray* res = taosArrayInit(size, POINTER_BYTES); - + for(int32_t i = 0; i < size; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); taosArrayPush(res, &pCheckInfo->pTableObj); } - + return res; } @@ -285,11 +285,11 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { STable* pTable = pCheckInfo->pTableObj; assert(pTable != NULL); - + if (pCheckInfo->initBuf) { return true; } - + pCheckInfo->initBuf = true; int32_t order = pHandle->order; @@ -297,34 +297,34 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh if (pHandle->mem == NULL && pHandle->imem == NULL) { return false; } - + assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); - + if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) { pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } - + if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) { pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } - + // both iterators are NULL, no data in buffer right now if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) { return false; } - + bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter)); if (memEmpty && imemEmpty) { // buffer is empty return false; } - + if (!memEmpty) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - + SDataRow row = SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, @@ -333,11 +333,11 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pHandle->qinfo); } - + if (!imemEmpty) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - + SDataRow row = SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, @@ -346,7 +346,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pHandle->qinfo); } - + return true; } @@ -449,7 +449,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); pHandle->cur.fid = -1; - + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STable* pTable = pCheckInfo->pTableObj; @@ -467,17 +467,17 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); - + // all data in mem are checked already. if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) { return false; } - + int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1; STimeWindow* win = &pHandle->cur.win; pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); - + // update the last key value pCheckInfo->lastKey = win->ekey + step; pHandle->cur.lastKey = win->ekey + step; @@ -486,7 +486,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { if (!ASCENDING_TRAVERSE(pHandle->order)) { SWAP(win->skey, win->ekey, TSKEY); } - + return true; } @@ -495,31 +495,31 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio if (key == TSKEY_INITIAL_VAL) { return INT32_MIN; } - + int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 fid = INT32_MIN; } - + if (fid > 0L && fid > INT32_MAX) { fid = INT32_MAX; } - + return fid; } static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { int32_t firstSlot = 0; int32_t lastSlot = numOfBlocks - 1; - + int32_t midSlot = firstSlot; - + while (1) { numOfBlocks = lastSlot - firstSlot + 1; midSlot = (firstSlot + (numOfBlocks >> 1)); - + if (numOfBlocks == 1) break; - + if (skey > pBlock[midSlot].keyLast) { if (numOfBlocks == 2) break; if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break; @@ -531,7 +531,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK break; // got the slot } } - + return midSlot; } @@ -669,10 +669,10 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); - + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { @@ -688,12 +688,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { SWAP(cur->win.skey, cur->win.ekey, TSKEY); } - + cur->mixBlock = true; cur->blockCompleted = false; return; } - + doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { @@ -727,14 +727,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); - + if (pCheckInfo->lastKey > pBlock->keyFirst) { cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { cur->pos = 0; } - + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); @@ -744,14 +744,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { return false; } - + SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; if (pCheckInfo->lastKey < pBlock->keyLast) { cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { cur->pos = pBlock->numOfRows - 1; } - + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); @@ -767,7 +767,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { TSKEY* keyList; assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - + if (num <= 0) return -1; keyList = (TSKEY*)pValue; @@ -826,13 +826,13 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { char* pData = NULL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; - + SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; TSKEY* tsArray = pCols->cols[0].pData; - + int32_t num = end - start + 1; int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); - + //data in buffer has greater timestamp, copy data in file block int32_t i = 0, j = 0; while(i < requiredNumOfCols && j < pCols->numOfCols) { @@ -905,7 +905,7 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap i++; } - + pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step; @@ -1027,7 +1027,7 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); - + initTableMemIterator(pQueryHandle, pCheckInfo); SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; @@ -1038,7 +1038,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; - int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); STable* pTable = pCheckInfo->pTableObj; @@ -1054,12 +1054,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); cur->mixBlock = true; } - + // compared with the data from in-memory buffer, to generate the correct timestamp array list - int32_t pos = cur->pos; - int32_t numOfRows = 0; - pQueryHandle->cur.win = TSWINDOW_INITIALIZER; + int32_t pos = cur->pos; + cur->win = TSWINDOW_INITIALIZER; // no data in buffer, load data from file directly if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { @@ -1069,13 +1068,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { SWAP(start, end, int32_t); } - + numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); - cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; + // the time window should always be right order: skey <= ekey + cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; pos += (end - start + 1) * step; - cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); + + cur->blockCompleted = + (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); // if the buffer is not full in case of descending order query, move the data in the front of the buffer moveDataToFront(pQueryHandle, numOfRows, numOfCols); @@ -1133,11 +1135,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend); pos += (qend - qstart + 1) * step; - cur->win.ekey = tsArray[end]; + cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart]; cur->lastKey = cur->win.ekey + step; } } while (numOfRows < pQueryHandle->outputCapacity); - + if (numOfRows < pQueryHandle->outputCapacity) { /** * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT @@ -1157,14 +1159,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); pos += (end - start + 1) * step; - cur->win.ekey = tsArray[end]; + cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start]; cur->lastKey = cur->win.ekey + step; } } } - - cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); + + cur->blockCompleted = + (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { SWAP(cur->win.skey, cur->win.ekey, TSKEY); @@ -1179,6 +1182,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey); } + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0); + assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]); + tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey, cur->win.ekey, cur->rows, pQueryHandle->qinfo); } @@ -1314,16 +1320,16 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO cleanBlockOrderSupporter(&sup, 0); return TSDB_CODE_TDB_OUT_OF_MEMORY; } - + int32_t cnt = 0; int32_t numOfQualTables = 0; - + for (int32_t j = 0; j < numOfTables; ++j) { STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); if (pTableCheck->numOfBlocks <= 0) { continue; } - + SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; @@ -1428,26 +1434,26 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } - + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo); - + assert(numOfBlocks >= 0); if (numOfBlocks == 0) { continue; } - + // todo return error code to query engine if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) { break; } - + assert(numOfBlocks >= pQueryHandle->numOfBlocks); if (pQueryHandle->numOfBlocks > 0) { break; } } - + // no data in file anymore if (pQueryHandle->numOfBlocks <= 0) { if (code == TSDB_CODE_SUCCESS) { @@ -1458,10 +1464,10 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex *exists = false; return code; } - + cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; - + STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); @@ -1477,7 +1483,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists pQueryHandle->locateStart = true; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); - + tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); @@ -1486,7 +1492,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists // check if current file block is all consumed STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - + // current block is done, try next if (!cur->mixBlock || cur->blockCompleted) { if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || @@ -1497,10 +1503,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists // next block of the same file int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; cur->slot += step; - + cur->mixBlock = false; cur->blockCompleted = false; - + STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); @@ -1518,15 +1524,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables); - + while (pQueryHandle->activeIndex < numOfTables) { if (hasMoreDataInCache(pQueryHandle)) { return true; } - + pQueryHandle->activeIndex += 1; } - + return false; } @@ -1544,14 +1550,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->order = TSDB_ORDER_DESC; - + if (!tsdbNextDataBlock(pHandle)) { return false; } - + /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); - + if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { // data already retrieve, discard other data rows and return int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); @@ -1559,7 +1565,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); } - + pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; pQueryHandle->window = pQueryHandle->cur.win; pQueryHandle->cur.rows = 1; @@ -1576,7 +1582,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pSecQueryHandle->checkFiles = true; pSecQueryHandle->activeIndex = 0; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; - + if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { free(pSecQueryHandle); return false; @@ -1586,24 +1592,24 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { // allocate buffer in order to load data blocks from file int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); - + pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - + colInfo.info = pCol->info; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); taosArrayPush(pSecQueryHandle->pColumns, &colInfo); } - + size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo)); STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); assert(pMeta != NULL); - + for (int32_t j = 0; j < si; ++j) { STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo info = { @@ -1611,10 +1617,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { .tableId = pCheckInfo->tableId, .pTableObj = pCheckInfo->pTableObj, }; - + taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); } - + tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); @@ -1624,17 +1630,17 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); - + SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); assert(pCol->info.colId == pCol1->info.colId); - + memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); } - + SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); // it is ascending order @@ -1658,7 +1664,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pQueryHandle->checkFiles = false; return true; } - + if (pQueryHandle->checkFiles) { bool exists = true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists); @@ -1671,11 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pQueryHandle->cost.checkForNextTime += elapsedTime; return exists; } - + pQueryHandle->activeIndex = 0; pQueryHandle->checkFiles = false; } - + // TODO: opt by consider the scan order bool ret = doHasDataInBuffer(pQueryHandle); terrno = TSDB_CODE_SUCCESS; @@ -1688,15 +1694,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; assert(!ASCENDING_TRAVERSE(pQueryHandle->order)); - + // starts from the buffer in case of descending timestamp order check data blocks - + // todo consider the query time window, current last_row does not apply the query time window size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - + TSKEY key = TSKEY_INITIAL_VAL; int32_t index = -1; - + for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); if (pCheckInfo->pTableObj->lastKey > key) { @@ -1704,36 +1710,36 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { index = i; } } - + if (index == -1) { // todo add failure test cases return; } - + // erase all other elements in array list size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { if (i == index) { continue; } - + STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); tSkipListDestroyIter(pTableCheckInfo->iter); - + if (pTableCheckInfo->pDataCols != NULL) { tfree(pTableCheckInfo->pDataCols->buf); } - + tfree(pTableCheckInfo->pDataCols); tfree(pTableCheckInfo->pCompInfo); } - + STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index); taosArrayClear(pQueryHandle->pTableCheckInfo); - + info.lastKey = key; taosArrayPush(pQueryHandle->pTableCheckInfo, &info); - + // update the query time window according to the chosen last timestamp pQueryHandle->window = (STimeWindow) {key, key}; } @@ -1742,13 +1748,13 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { // filter the queried time stamp in the first place STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; pQueryHandle->order = TSDB_ORDER_DESC; - + assert(pQueryHandle->window.skey == pQueryHandle->window.ekey); - + // starts from the buffer in case of descending timestamp order check data blocks // todo consider the query time window, current last_row does not apply the query time window size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - + int32_t i = 0; while(i < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); @@ -1756,21 +1762,21 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) { break; } - + i++; } - + // there are no data in all the tables if (i == numOfTables) { return; } - + STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i); taosArrayClear(pQueryHandle->pTableCheckInfo); - + info.lastKey = pQueryHandle->window.skey; taosArrayPush(pQueryHandle->pTableCheckInfo, &info); - + // update the query time window according to the chosen last timestamp pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL}; } @@ -1794,7 +1800,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, pQueryHandle->window.ekey); - + break; } @@ -1809,21 +1815,21 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int moveToNextRowInMem(pCheckInfo); break; } - + } while(moveToNextRowInMem(pCheckInfo)); assert(numOfRows <= maxRowsToRead); - + // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { int32_t emptySize = maxRowsToRead - numOfRows; - + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } } - + int64_t elapsedTime = taosGetTimestampUs() - st; tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle, elapsedTime, numOfRows, numOfCols); @@ -1835,7 +1841,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; SQueryFilePos* cur = &pHandle->cur; STable* pTable = NULL; - + // there are data in file if (pHandle->cur.fid >= 0) { STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; @@ -1857,13 +1863,13 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p */ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - + SQueryFilePos* c = &pHandle->cur; if (c->mixBlock) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - + STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot]; assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0))); @@ -1883,7 +1889,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta for(int32_t i = 0; i < numOfCols; ++i) { pHandle->statis[i].colId = colIds[i]; } - + tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); // always load the first primary timestamp column data @@ -1932,31 +1938,31 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { } else { SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock); assert(pHandle->realNumOfRows <= binfo.rows); - + // data block has been loaded, todo extract method SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; - + if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid && pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { return pHandle->pColumns; } else { // only load the file block SCompBlock* pBlock = pBlockInfo->compBlock; doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); - + // todo refactor int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); - + // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { int32_t emptySize = pHandle->outputCapacity - numOfRows; int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns); - + for(int32_t i = 0; i < reqNumOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } } - + return pHandle->pColumns; } } @@ -1967,11 +1973,11 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) { SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex); while (tSkipListIterNext(iter)) { SSkipListNode* pNode = tSkipListIterGet(iter); - + STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); taosArrayPush(list, pTable); } - + tSkipListDestroyIter(iter); return TSDB_CODE_SUCCESS; } @@ -1981,12 +1987,12 @@ static void destroyHelper(void* param) { return; } - + tQueryInfo* pInfo = (tQueryInfo*)param; if (pInfo->optr != TSDB_RELATION_IN) { tfree(pInfo->q); } - + // tVariantDestroy(&(pInfo->q)); free(param); } @@ -1998,7 +2004,7 @@ void filterPrepare(void* expr, void* param) { } pExpr->_node.info = calloc(1, sizeof(tQueryInfo)); - + STSchema* pTSSchema = (STSchema*) param; tQueryInfo* pInfo = pExpr->_node.info; tVariant* pCond = pExpr->_node.pRight->pVal; @@ -2008,7 +2014,7 @@ void filterPrepare(void* expr, void* param) { pInfo->optr = pExpr->_node.optr; pInfo->compare = getComparFunc(pSchema->type, pInfo->optr); pInfo->param = pTSSchema; - + if (pInfo->optr == TSDB_RELATION_IN) { pInfo->q = (char*) pCond->arr; } else { @@ -2028,18 +2034,18 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; STable* pTable1 = *(STable**) p1; STable* pTable2 = *(STable**) p2; - + for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) { SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; int32_t colIndex = pColIndex->colIndex; - + assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX); - + char * f1 = NULL; char * f2 = NULL; int32_t type = 0; int32_t bytes = 0; - + if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { f1 = (char*) TABLE_NAME(pTable1); f2 = (char*) TABLE_NAME(pTable2); @@ -2073,14 +2079,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { return ret; } } - + return 0; } void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { STable* pTable = taosArrayGetP(pTableList, 0); - + SArray* g = taosArrayInit(16, POINTER_BYTES); taosArrayPush(g, &pTable); tsdbRefTable(pTable); @@ -2088,10 +2094,10 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable for (int32_t i = 1; i < numOfTables; ++i) { STable** prev = taosArrayGet(pTableList, i - 1); STable** p = taosArrayGet(pTableList, i); - + int32_t ret = compareFn(prev, p, pSupp); assert(ret == 0 || ret == -1); - + tsdbRefTable(*p); assert((*p)->type == TSDB_CHILD_TABLE); @@ -2103,20 +2109,20 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable taosArrayPush(g, p); } } - + taosArrayPush(pGroups, &g); } SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { assert(pTableList != NULL); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); - + size_t size = taosArrayGetSize(pTableList); if (size == 0) { tsdbDebug("no qualified tables"); return pTableGroup; } - + if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table SArray* sa = taosArrayInit(size, POINTER_BYTES); for(int32_t i = 0; i < size; ++i) { @@ -2126,7 +2132,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC tsdbRefTable(*pTable); taosArrayPush(sa, pTable); } - + taosArrayPush(pTableGroup, &sa); tsdbDebug("all %zu tables belong to one group", size); } else { @@ -2134,18 +2140,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC pSupp->numOfCols = numOfOrderCols; pSupp->pTagSchema = pTagSchema; pSupp->pCols = pCols; - + taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn); tfree(pSupp); } - + return pTableGroup; } bool indexedNodeFilterFp(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*) param; - + STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); char* val = NULL; @@ -2155,7 +2161,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { } else { val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId); } - + int32_t ret = 0; if (val == NULL) { //the val is possible to be null, so check it out carefully ret = -1; // val is missing in table tags value pairs @@ -2192,7 +2198,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { default: assert(false); } - + return true; } @@ -2222,7 +2228,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT goto _error; } - + if (pTable->type != TSDB_SUPER_TABLE) { tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, pTable->name->data); @@ -2235,7 +2241,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT //NOTE: not add ref count for super table SArray* res = taosArrayInit(8, POINTER_BYTES); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); - + // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { int32_t ret = getAllTableList(pTable, res); @@ -2246,7 +2252,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); - + tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables); taosArrayDestroy(res); @@ -2282,7 +2288,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT } CATCH( code ) { CLEANUP_EXECUTE(); terrno = code; - goto _error; + goto _error; // TODO: more error handling } END_TRY @@ -2318,12 +2324,12 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p pGroupInfo->numOfTables = 1; pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - + SArray* group = taosArrayInit(1, POINTER_BYTES); - + taosArrayPush(group, &pTable); taosArrayPush(pGroupInfo->pGroupList, &group); - + return TSDB_CODE_SUCCESS; _error: @@ -2375,7 +2381,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { if (pQueryHandle == NULL) { return; } - + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);