diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a8b58227498c63b0322745d4f5e672cf048749fe..a32465d4e4f0ad0a895051b4a4c194813a0380ec 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -251,9 +251,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); -// bug api, deprecated, USE H version -int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); - int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 558c3f6e59553829b982ee7d63d0fbdc66f89bdf..56797b61afd7d8165cd6a1afc8e0084c818969b8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -22,12 +22,6 @@ .rows = (_block)->numOfRows, \ .uid = (_checkInfo)->tableId}) -enum { - TSDB_CACHED_TYPE_NONE = 0, - TSDB_CACHED_TYPE_LASTROW = 1, - TSDB_CACHED_TYPE_LAST = 2, -}; - typedef struct SQueryFilePos { int32_t fid; int32_t slot; @@ -46,12 +40,6 @@ typedef struct SDataBlockLoadInfo { SArray* pLoadedCols; } SDataBlockLoadInfo; -enum { - CHECKINFO_CHOSEN_MEM = 0, - CHECKINFO_CHOSEN_IMEM = 1, - CHECKINFO_CHOSEN_BOTH = 2 // for update=2(merge case) -}; - typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; @@ -74,10 +62,10 @@ typedef struct SBlockOrderWrapper { } SBlockOrderWrapper; typedef struct SBlockOrderSupporter { - int32_t numOfTables; SBlockOrderWrapper** pDataBlockInfo; int32_t* indexPerTable; int32_t* numOfBlocksPerTable; + int32_t numOfTables; } SBlockOrderSupporter; typedef struct SIOCostSummary { @@ -123,11 +111,6 @@ typedef struct SVersionRange { uint64_t maxVer; } SVersionRange; -typedef struct SComposedDataBlock { - bool composed; - int32_t rows; -} SComposedDataBlock; - typedef struct SReaderStatus { SQueryFilePos cur; // current position bool loadFromFile; // check file stage @@ -289,13 +272,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK // return pNew; // } -static bool isEmptyQueryTimeWindow(STsdbReader* pTsdbReader) { - ASSERT(pTsdbReader != NULL); - - STimeWindow* w = &pTsdbReader->window; - bool asc = ASCENDING_TRAVERSE(pTsdbReader->order); - - return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey)); +static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) { + ASSERT(pWindow != NULL); + bool asc = ASCENDING_TRAVERSE(order); + return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey)); } // // Update the query time window according to the data time to live(TTL) information, in order to avoid to return @@ -334,34 +314,29 @@ static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond, // } } -static void checkResultSize(const SQueryTableDataCond* pCond, STsdbReader* pReader) { +static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) { int32_t rowLen = 0; for (int32_t i = 0; i < pCond->numOfCols; ++i) { rowLen += pCond->colList[i].bytes; } // make sure the output SSDataBlock size be less than 2MB. - int32_t TWOMB = 2 * 1024 * 1024; - if (pReader->capacity * rowLen > TWOMB) { - pReader->capacity = TWOMB / rowLen; + const int32_t TWOMB = 2 * 1024 * 1024; + if ((*capacity) * rowLen > TWOMB) { + (*capacity) = TWOMB / rowLen; } } // init file iterator -static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState) { +static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState, const char* idstr) { pIter->index = -1; pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet); pIter->pFileList = taosArrayDup(pFState->aDFileSet); + tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); return TSDB_CODE_SUCCESS; } -static void resetDataBlockIterator(SDataBlockIter* pIter) { - pIter->numOfBlocks = -1; - pIter->index = -1; - pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); -} - static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { pIter->index += 1; if (pIter->index >= pIter->numOfFiles) { @@ -394,6 +369,12 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* return false; } +static void resetDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = -1; + pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); +} + static void initReaderStatus(SReaderStatus* pStatus) { pStatus->cur.fid = INT32_MIN; pStatus->cur.win = TSWINDOW_INITIALIZER; @@ -427,7 +408,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setQueryTimewindow(pReader, pCond, 0); if (pCond->numOfCols > 0) { - checkResultSize(pCond, pReader); + limitOutputBufferSize(pCond, &pReader->capacity); // allocate buffer in order to load data blocks from file pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); @@ -450,7 +431,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd } STsdbFSState* pFState = pReader->pTsdb->fs->cState; - initFileIterator(&pReader->status.fileIter, pFState); + initFileIterator(&pReader->status.fileIter, pFState, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter); // no data in files, let's try buffer in memory @@ -520,93 +501,6 @@ _end: // return res; // } -// static bool initTableMemIterator(STsdbReader* pHandle, STableBlockScanInfo* pCheckInfo) { -// if (pCheckInfo->initBuf) { -// return true; -// } - -// pCheckInfo->initBuf = true; -// int32_t order = pHandle->order; - -// STbData* pMem = NULL; -// STbData* pIMem = NULL; -// int8_t backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0; - -// TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey); -// if (pHandle->pTsdb->mem != NULL) { -// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem); -// if (pMem != NULL) { -// tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter); -// } -// } - -// if (pHandle->pTsdb->imem != NULL) { -// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem); -// if (pIMem != NULL) { -// tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter); -// } -// } - -// // both iterators are NULL, no data in buffer right now -// if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) { -// return false; -// } - -// bool memEmpty = -// (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL)); -// bool imemEmpty = -// (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL)); -// if (memEmpty && imemEmpty) { // buffer is empty -// return false; -// } - -// if (!memEmpty) { -// TSDBROW row; - -// tsdbTbDataIterGet(pCheckInfo->iter, &row); -// TSKEY key = row.pTSRow->ts; // first timestamp in buffer -// tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 -// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", -// pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey, -// pMem->sl.size, pHandle->idStr); - -// if (ASCENDING_TRAVERSE(order)) { -// assert(pCheckInfo->lastKey <= key); -// } else { -// assert(pCheckInfo->lastKey >= key); -// } - -// } else { -// tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); -// } - -// if (!imemEmpty) { -// TSDBROW row; - -// tsdbTbDataIterGet(pCheckInfo->iter, &row); -// TSKEY key = row.pTSRow->ts; // first timestamp in buffer -// tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 -// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", -// pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey, -// pIMem->sl.size, pHandle->idStr); - -// if (ASCENDING_TRAVERSE(order)) { -// assert(pCheckInfo->lastKey <= key); -// } else { -// assert(pCheckInfo->lastKey >= key); -// } -// } else { -// tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); -// } - -// return true; -// } - -// static void destroyTableMemIterator(STableBlockScanInfo* pCheckInfo) { -// tsdbTbDataIterDestroy(pCheckInfo->iter); -// tsdbTbDataIterDestroy(pCheckInfo->iiter); -// } - // static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) { // TSDBROW row = {0}; // STSRow *rmem = NULL, *rimem = NULL; @@ -958,77 +852,79 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ return TSDB_CODE_SUCCESS; } -static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) { - pDumpInfo->rowIndex = pBlockData->nRow; - pDumpInfo->totalRows = pBlockData->nRow; - pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value +static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) { + int32_t step = ASCENDING_TRAVERSE(order)? 1:-1; + + pDumpInfo->rowIndex = pBlock->nRow; + pDumpInfo->totalRows = pBlock->nRow; + pDumpInfo->lastKey = pBlock->maxKey.ts + step; +} + +static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (pColVal->isNull) { + colDataAppendNULL(pColInfoData, rowIndex); + } else { + varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); + memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); + colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); + } + } else { + colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull); + } } static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { - int64_t st = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); - - uint8_t *pb = NULL, *pb1 = NULL; - int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) { - SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i); - if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - for (int32_t j = 0; j < pBlockData->nRow; ++j) { - colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false); - } - } else { - SColVal cv = {0}; - SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1); - for (int32_t j = 0; j < pBlockData->nRow; ++j) { - tColDataGetValue(pData, j, &cv); - colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); - } - } - } + SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); + SSDataBlock* pResBlock = pReader->pResBlock; - pReader->pResBlock->info.rows = pBlockData->nRow; - setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); - -/* - int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, - (int)(QH_GET_NUM_OF_COLS(pReader)), true); - if (ret != TSDB_CODE_SUCCESS) { - int32_t c = terrno; - assert(c != TSDB_CODE_SUCCESS); - goto _error; - } + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo; + uint8_t *pb = NULL, *pb1 = NULL; + int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - pBlockLoadInfo->fileGroup = pReader->pFileGroup; - pBlockLoadInfo->slot = pReader->cur.slot; - pBlockLoadInfo->uid = pCheckInfo->tableId; + int32_t numOfCols = blockDataGetNumOfCols(pResBlock); - SDataCols* pCols = pReader->rhelper.pDCols[0]; - assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); + SColVal cv = {0}; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); + if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + for (int32_t j = 0; j < pBlockData->nRow; ++j) { + colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false); + } + } else { + SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pSupInfo->slotIds[i] - 1); + for (int32_t j = 0; j < pBlockData->nRow; ++j) { + tColDataGetValue(pData, j, &cv); + doCopyColVal(pColData, j, i, &cv, pSupInfo); + } + } + } - pBlock->numOfRows = pCols->numOfRows; -*/ + pResBlock->info.rows = pBlockData->nRow; + setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlock, pReader->order); - int64_t elapsedTime = (taosGetTimestampUs() - st); - pReader->cost.blockLoadTime += elapsedTime; + int64_t elapsedTime = (taosGetTimestampUs() - st); + pReader->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", - pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow, - pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); - return TSDB_CODE_SUCCESS; + tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", + pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow, + pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); + return TSDB_CODE_SUCCESS; _error: - tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s", - pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pReader->idStr); - return code; + tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, %s", + pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, + pReader->idStr); + return code; } // static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo) { @@ -2492,9 +2388,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", + pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); return code; + } else { + tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 + "-%" PRId64 " %s", + pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey, + pReader->idStr); } } + } else { + tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } STbData* di = NULL; @@ -2503,9 +2408,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", + pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); return code; + } else { + tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 + "-%" PRId64 " %s", + pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey, + pReader->idStr); } } + } else { + tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } pBlockScanInfo->iterInit = true; @@ -2605,24 +2519,24 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // 2. current block should not overlap with next neighbor block // 3. current timestamp should not be overlap with each other SDataBlockInfo* pInfo = &pReader->pResBlock->info; - pInfo->rows = pBlock->nRow; - pInfo->uid = pScanInfo->uid; - pInfo->window.skey = pBlock->minKey.ts; - pInfo->window.ekey = pBlock->maxKey.ts; + pInfo->rows = pBlock->nRow; + pInfo->uid = pScanInfo->uid; + pInfo->window = (STimeWindow) {.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts}; setComposedBlockFlag(pReader, false); + setBlockDumpCompleted(&pStatus->fBlockDumpInfo, pBlock, pReader->order); } return code; } -static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; while(1) { if (pStatus->pTableIter == NULL) { pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); if (pStatus->pTableIter == NULL) { - return false; + return TSDB_CODE_SUCCESS; } } @@ -2630,20 +2544,26 @@ static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { initMemIterator(pBlockScanInfo, pReader); TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); + int32_t code = buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->pResBlock->info.rows > 0) { - return true; + return TSDB_CODE_SUCCESS; } // current table is exhausted, let's try the next table pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); if (pStatus->pTableIter == NULL) { - return false; + return TSDB_CODE_SUCCESS; } } } static int32_t buildBlockFromFiles(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -2652,7 +2572,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (pFIter->index < pFIter->numOfFiles) { if (pReader->status.blockIter.index == -1) { int32_t numOfBlocks = 0; - int32_t code = moveToNextFile(pReader, &numOfBlocks); + code = moveToNextFile(pReader, &numOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2664,7 +2584,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } code = doBuildDataBlock(pReader); - + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2676,7 +2598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (!hasNext) { // current file is exhausted, let's try the next file int32_t numOfBlocks = 0; - int32_t code = moveToNextFile(pReader, &numOfBlocks); + code = moveToNextFile(pReader, &numOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2692,15 +2614,21 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - doBuildDataBlock(pReader); + code = doBuildDataBlock(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } else { // try next data block in current file blockIteratorNext(pBlockIter); - doBuildDataBlock(pReader); + code = doBuildDataBlock(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } else { - buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); - return TSDB_CODE_SUCCESS; + code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); + return code; } // repeat the previous procedure. @@ -2708,7 +2636,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } - return TSDB_CODE_SUCCESS; + return code; } // // todo not unref yet, since it is not support multi-group interpolation query @@ -2901,18 +2829,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false); } else { tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal); - - if (IS_VAR_DATA_TYPE(colVal.type)) { - if (colVal.isNull) { - colDataAppendNULL(pColInfoData, numOfRows); - } else { - varDataSetLen(pSupInfo->buildBuf[i], colVal.value.nData); - memcpy(varDataVal(pSupInfo->buildBuf[i]), colVal.value.pData, colVal.value.nData); - colDataAppend(pColInfoData, numOfRows, pSupInfo->buildBuf[i], false); - } - } else { - colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull); - } + doCopyColVal(pColInfoData, i, numOfRows, &colVal, pSupInfo); } } @@ -3198,7 +3115,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } STsdbReader* pReader = *ppReader; - if (isEmptyQueryTimeWindow(pReader)) { + if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -3259,7 +3176,7 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFreeClear(pReader->suppInfo.plist); taosMemoryFree(pReader->suppInfo.slotIds); - if (!isEmptyQueryTimeWindow(pReader)) { + if (!isEmptyQueryTimeWindow(&pReader->window, pReader->order)) { // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); } else { ASSERT(pReader->status.pTableMap == NULL); @@ -3291,7 +3208,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } bool tsdbNextDataBlock(STsdbReader* pReader) { - if (isEmptyQueryTimeWindow(pReader)) { + if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) { return false; } @@ -3313,11 +3230,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pBlock->info.rows > 0) { return true; } else { - buildBlockFromBufferSeqentially(pReader); + buildBlockFromBufferSequentially(pReader); return pBlock->info.rows > 0; } } else { // no data in files, let's try the buffer - buildBlockFromBufferSeqentially(pReader); + buildBlockFromBufferSequentially(pReader); return pBlock->info.rows > 0; } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { @@ -3370,13 +3287,12 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t code = 0; - // *allHave = false; + *allHave = false; - // SQueryFilePos* c = &pReader->cur; - // if (c->mixBlock) { - // *pBlockStatis = NULL; - // return TSDB_CODE_SUCCESS; - // } + if (pReader->status.composedDataBlock) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; + } // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot]; // assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));