diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7c8d580789dfab050a246fff8b066f480f07c33b..5191331370b91277a7af1eeeafaec1b3d1d320af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -63,8 +63,8 @@ typedef struct STableBlockScanInfo { // int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks uint8_t chosen : 2; // indicate which iterator should move forward bool iterInit; // whether to initialize the in-memory skip list iterator or not - STbDataIter iter; // mem buffer skip list iterator - STbDataIter iiter; // imem buffer skip list iterator + STbDataIter* iter; // mem buffer skip list iterator + STbDataIter* iiter; // imem buffer skip list iterator bool memHasVal; bool imemHasVal; } STableBlockScanInfo; @@ -125,18 +125,15 @@ typedef struct SComposedDataBlock { } SComposedDataBlock; typedef struct SReaderStatus { - SQueryFilePos cur; // current position - int32_t tableListIndex; - bool loadFromFile; // check file stage - bool initStartPos; - SHashObj* pTableMap; // SHash - int32_t realNumOfRows; - + SQueryFilePos cur; // current position + bool loadFromFile; // check file stage + SHashObj* pTableMap; // SHash + STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. SFileBlockDumpInfo fBlockDumpInfo; - SBlockData fileBlockData; - - SFileSetIter fileIter; - SDataBlockIter blockIter; + SBlockData fileBlockData; + SFileSetIter fileIter; + SDataBlockIter blockIter; + bool composedDataBlock;// the returned data block is a composed block or not } SReaderStatus; struct STsdbReader { @@ -173,10 +170,13 @@ struct STsdbReader { }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); -static int tsdbReadRowsFromCache(STableBlockScanInfo* pScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); +static int buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); -static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pData, SFileBlockDumpInfo* pDumpInfo, int64_t ts, SRowMerger *pMerger, - STsdbReader* pReader, STSRow** pRow); +static int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, + STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); +static int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); +static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static void setComposedBlockFlag(STsdbReader* pReader, bool composed); // static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { // pBlockLoadInfo->slot = -1; @@ -204,7 +204,7 @@ static int32_t setColumnIdList(STsdbReader* pReader, SSDataBlock* pBlock) { return TSDB_CODE_SUCCESS; } -static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const uint64_t* idList, int32_t numOfTables) { +static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) { ASSERT(numOfTables >= 1); // allocate buffer in order to load data blocks from file @@ -216,7 +216,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const uint64_ // todo apply the lastkey of table check to avoid to load header file for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j]}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -337,7 +337,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) { pIter->numOfBlocks = -1; } -static bool nextFilesetIterator(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { +static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { if (pIter->index >= pIter->numOfFiles) { return false; } @@ -370,8 +370,7 @@ static bool nextFilesetIterator(SFileSetIter* pIter, int32_t order, STsdbReader* static void initReaderStatus(SReaderStatus* pStatus) { pStatus->cur.fid = INT32_MIN; pStatus->cur.win = TSWINDOW_INITIALIZER; - pStatus->initStartPos = false; - pStatus->tableListIndex = 0; // current active table index + pStatus->pTableIter = NULL; pStatus->loadFromFile = true; } @@ -384,13 +383,18 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd } initReaderStatus(&pReader->status); + pReader->pTsdb = pVnode->pTsdb; pReader->suid = pCond->suid; pReader->order = pCond->order; pReader->capacity = 4096; pReader->idStr = strdup(idstr); pReader->startVersion= pCond->startVersion; - pReader->endVersion = pCond->endVersion; + pReader->endVersion = 100000;//pCond->endVersion; // todo for test purpose + pReader->type = pCond->type; + pReader->window = *pCond->twindows; + + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); // todo remove this setQueryTimewindow(pReader, pCond, 0); @@ -405,7 +409,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd goto _end; } - // todo use new api refactor this + // todo use new api refactor this after merge with 3.0 pReader->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pReader->pResBlock == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -418,13 +422,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd colInfo.info = pCond->colList[i]; taosArrayPush(pReader->pResBlock->pDataBlock, &colInfo); } + pReader->pResBlock->info.numOfCols = taosArrayGetSize(pReader->pResBlock->pDataBlock); + + blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity); setColumnIdList(pReader, pReader->pResBlock); pReader->suppInfo.slotIds = taosMemoryCalloc(pCond->numOfCols, sizeof(int32_t)); pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); } - // todo refactor STsdbFSState* pFState = pReader->pTsdb->fs->cState; initFileIterator(&pReader->status.fileIter, pFState); resetDataBlockIterator(&pReader->status.blockIter); @@ -443,39 +449,6 @@ _end: return code; } -// static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) { -// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); - -// int32_t sversion = 1; - -// SMetaReader mr = {0}; -// metaReaderInit(&mr, pVnode->pMeta, 0); -// int32_t code = metaGetTableEntryByUid(&mr, pCheckInfo->tableId); -// if (code != TSDB_CODE_SUCCESS) { -// terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; -// metaReaderClear(&mr); -// return terrno; -// } - -// if (mr.me.type == TSDB_CHILD_TABLE) { -// tb_uid_t suid = mr.me.ctbEntry.suid; -// code = metaGetTableEntryByUid(&mr, suid); -// if (code != TSDB_CODE_SUCCESS) { -// terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; -// metaReaderClear(&mr); -// return terrno; -// } -// sversion = mr.me.stbEntry.schemaRow.version; -// } else { -// ASSERT(mr.me.type == TSDB_NORMAL_TABLE); -// sversion = mr.me.ntbEntry.schemaRow.version; -// } - -// metaReaderClear(&mr); -// pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, sversion); -// return TSDB_CODE_SUCCESS; -// } - // void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, // int32_t tWinIdx) { // STsdbReader* pTsdbReadHandle = queryHandle; @@ -796,7 +769,7 @@ _end: // int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1; // STimeWindow* win = &pHandle->cur.win; -// pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); +// pHandle->cur.rows = buildInmemDataBlockImpl(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); // // update the last key value // pCheckInfo->lastKey = win->ekey + step; @@ -1036,7 +1009,7 @@ _error: // TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step); // cur->rows = -// tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); +// buildInmemDataBlockImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); // pTsdbReadHandle->realNumOfRows = cur->rows; // // update the last key value @@ -2083,6 +2056,15 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte return TSDB_CODE_SUCCESS; } +static bool blockIteratorNext(STsdbReader* pReader, SDataBlockIter* pBlockIter) { + if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) { + return false; + } + + pBlockIter->index += 1; + return true; +} + // static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists); //static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) { @@ -2228,77 +2210,271 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pFBlockInfo; } -static bool overlapWithNeighborBlock(SBlock* pBlock, int32_t blockIndex) { - return false; +static bool overlapWithNeighborBlock(SFileDataBlockInfo *pFBlockInfo, SBlock* pBlock, STableBlockScanInfo* pTableBlockScanInfo) { + // it is the last block in current file, no chance to overlap with neighbor blocks. + if(pFBlockInfo->tbBlockIdx == taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { // last block in current file, + return false; + } + + SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, pFBlockInfo->tbBlockIdx + 1); + return (pNext->minKey.ts == pBlock->maxKey.ts); } -static bool bufferDataInFileBlockGap(int32_t order, int64_t key, SBlock* pBlock) { +static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) { bool ascScan = ASCENDING_TRAVERSE(order); - return (ascScan && (key != TSKEY_INITIAL_VAL && key <= pBlock->minKey.ts)) || - (!ascScan && (key != TSKEY_INITIAL_VAL && key >= pBlock->maxKey.ts)); + return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) || + (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts)); } -static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlock* pBlock, TSDBKEY *key) { - int32_t code = TSDB_CODE_SUCCESS; - - bool ascScan = ASCENDING_TRAVERSE(pReader->order); - bool cacheDataInFileBlockHole = (ascScan && (key->ts != TSKEY_INITIAL_VAL && key->ts < pBlock->minKey.ts)) || - (!ascScan && (key->ts != TSKEY_INITIAL_VAL && key->ts > pBlock->maxKey.ts)); - ASSERT(cacheDataInFileBlockHole); +static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { + if (pBlockScanInfo->iter != NULL) { + pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter); + } else { + pBlockScanInfo->memHasVal = false; + } - // do not load file block into buffer - int32_t step = ascScan ? 1 : -1; + if (pBlockScanInfo->iiter != NULL) { + pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(pBlockScanInfo->iiter); + } else { + pBlockScanInfo->imemHasVal = false; + } - TSDBKEY maxKey = {.version = pReader->endVersion}; - maxKey.ts = ascScan ? (pBlock->minKey.ts - step) : (pBlock->maxKey.ts - step); + if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { + return TSDB_CODE_SUCCESS; + } - pBlockScanInfo->memHasVal = tsdbTbDataIterNext(&pBlockScanInfo->iter); - pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(&pBlockScanInfo->iiter); + int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader); + setComposedBlockFlag(pReader, true); - code = tsdbReadRowsFromCache(pBlockScanInfo, maxKey, pReader->capacity, pReader); + // set the correct block data info return code; } -static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { +static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) { SFileBlockDumpInfo *pDumpInfo = &pReader->status.fBlockDumpInfo; - SBlockData* pData = &pReader->status.fileBlockData; + SBlockData* pBlockData = &pReader->status.fileBlockData; - STSchema* pSchema = NULL; SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + TSKEY mergeTs = TSKEY_INITIAL_VAL; - int64_t key = pData->aTSKEY[0]; - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); + TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - // todo check version in file - if (key < k.ts || key < ik.ts) { - tRowMergerInit(&merge, NULL, pSchema); - STSRow* pTsRow = NULL; - doLoadRowsOfIdenticalTsInFileBlock(pData, pDumpInfo, key, &merge, pReader, &pTsRow); + // [1&2] key <= [k.ts|ik.ts] + if (key <= k.ts || key <= ik.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + if (ik.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + } + + if (k.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + } else { + // [3] ik.ts < key <= k.ts + if (ik.ts < k.ts) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + // [4] k.ts < key <= ik.ts + if (k.ts < ik.ts) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + // [5] k.ts == ik.ts < key + if (k.ts == ik.ts) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + + if (k.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + // [6] k.ts < ik.ts < key + if (k.ts < ik.ts) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + + return TSDB_CODE_SUCCESS; + } + + // [6] ik.ts < k.ts < key + if (ik.ts < k.ts) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + } + } else if (pBlockScanInfo->imemHasVal) { + TSDBKEY ik = TSDBROW_KEY(piRow); + if (key <= ik.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + + if (ik.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + if (ik.ts < key) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + } else { // pBlockScanInfo->memHasVal != NULL + TSDBKEY k = TSDBROW_KEY(pRow); + if (key <= k.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + + if (k.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; } + + if (k.ts < key) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) { + SSDataBlock* pResBlock = pReader->pResBlock; + + while(1) { + buildComposedDataBlockImpl(pReader, pFBlock, pBlock, pBlockScanInfo); + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now + + if (pDumpInfo->rowIndex >= pBlock->nRow) { + break; + } + + if (pResBlock->info.rows >= pReader->capacity) { + break; + } + } else { // todo traverse to next file due to time window overlap + if (pResBlock->info.rows >= pReader->capacity) { + ASSERT(0); + return TSDB_CODE_SUCCESS; + } + } + } + + pResBlock->info.uid = pBlockScanInfo->uid; + setComposedBlockFlag(pReader, true); + return TSDB_CODE_SUCCESS; +} + +void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } + +static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + if (pBlockScanInfo->iterInit) { + return TSDB_CODE_SUCCESS; + } + + TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion}; + + STbData* d = NULL; + if (pReader->pTsdb->mem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); } + STbData* di = NULL; + if (pReader->pTsdb->imem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); + } + + pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; } +static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) { + TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; + + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + + initMemIterator(pScanInfo, pReader); + if (pScanInfo->memHasVal) { + TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader); + key = TSDBROW_KEY(pRow); + } + + if (pScanInfo->imemHasVal) { + TSDBROW* pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader); + TSDBKEY k = TSDBROW_KEY(pRow); + if (key.ts > k.ts) { + key = k; + } + } + + return key; +} + static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + if (pFIter->index < pFIter->numOfFiles) { if (pReader->status.blockIter.index == -1) { int32_t numOfBlocks = 0; while (1) { - bool hasNext = nextFilesetIterator(&pStatus->fileIter, pReader->order, pReader); - if (!hasNext) { - // no data files on disk + bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader); + if (!hasNext) { // no data files on disk break; } @@ -2323,71 +2499,67 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { // no blocks in current file, try next files } - SDataBlockIter* pBlockIter = &pReader->status.blockIter; int32_t code = initBlockIterator(pReader, pBlockIter, numOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } - int64_t key = 0; // todo get the first qualified key in buffer - + // todo extract method: getCurrentKeyInBuf() SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - if (pScanInfo->iterInit == false) { - STbData* d = NULL; - tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pScanInfo->uid, &d); - - TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion}; - tsdbTbDataIterOpen(d, &startKey, 0, &pScanInfo->iter); + TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); - STbData* di = NULL; - tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pScanInfo->uid, &di); - tsdbTbDataIterOpen(di, &startKey, 0, &pScanInfo->iiter); - - pScanInfo->iterInit = true; - } - - if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pBlock, pFBlock->tbBlockIdx)) { + if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) /*|| points overlaps with data block*/) { SBlockData data = {0}; doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); // build composed data block - + buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion}; - buildInmemDataBlock(pReader, pScanInfo, pBlock, &maxKey); + buildInmemDataBlock(pReader, pScanInfo, &maxKey); // build data block from in-memory buffer data completed. } else { // whole block is required, return it directly - SDataBlockInfo info = {0}; - info.rows = pBlock->nRow; - info.uid = pScanInfo->uid; - info.window.skey = pBlock->minKey.ts; - info.window.ekey = pBlock->maxKey.ts; + // todo check the data version + SDataBlockInfo* pInfo = &pReader->pResBlock->info; + pInfo->rows = pBlock->nRow; + pInfo->uid = pScanInfo->uid; + pInfo->window.skey = pBlock->minKey.ts; + pInfo->window.ekey = pBlock->maxKey.ts; + setComposedBlockFlag(pReader, false); } - } else { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - } - } + // current block are exhausted, try the next file block + if (pDumpInfo->rowIndex >= pBlock->nRow) { + bool hasNext = blockIteratorNext(pReader, &pReader->status.blockIter); + if (!hasNext) { + // current file is exhausted, let's try the next file - return TSDB_CODE_SUCCESS; -} -// static bool doHasDataInBuffer(STsdbReader* pTsdbReadHandle) { -// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); + } else { // try next data block in current file + // 1. check if ts in buffer is overlap with current file data block + TSDBKEY key1 = getCurrentKeyInBuf(pBlockIter, pReader); -// while (pTsdbReadHandle->activeIndex < numOfTables) { -// if (hasMoreDataInCache(pTsdbReadHandle)) { -// return true; -// } + } + } else { + buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); + return TSDB_CODE_SUCCESS; + } -// pTsdbReadHandle->activeIndex += 1; -// } + // repeat the previous procedure. -// return false; -// } + } + } + + return TSDB_CODE_SUCCESS; +} // // todo not unref yet, since it is not support multi-group interpolation query // static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { @@ -2423,7 +2595,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { // } TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { - if (!hasVal) { + if (!(*hasVal)) { return NULL; } @@ -2462,7 +2634,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { while (1) { *hasVal = tsdbTbDataIterNext(pIter); - if (!*hasVal) { + if (!(*hasVal)) { break; } @@ -2478,33 +2650,47 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR return TSDB_CODE_SUCCESS; } -int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pData, SFileBlockDumpInfo* pDumpInfo, int64_t ts, SRowMerger *pMerger, - STsdbReader* pReader, STSRow** pRow) { - int64_t key = pData->aTSKEY[pDumpInfo->rowIndex]; - if ((pDumpInfo->rowIndex < pData->nRow - 1)) { - if (pData->aTSKEY[pDumpInfo->rowIndex + 1] < key) { - SRowMerger merger = {0}; - tRowMergerInit(&merger, NULL, NULL); +int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, + STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { + if (pBlockData->aTSKEY[pDumpInfo->rowIndex + 1] == key) { int32_t rowIndex = pDumpInfo->rowIndex + 1; - while (pData->aTSKEY[rowIndex] == key) { - tRowMerge(&merger, NULL); + + while (pBlockData->aTSKEY[rowIndex] == key) { + if (pBlockData->aVersion[rowIndex] > pReader->endVersion) { + continue; + } + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); + tRowMerge(pMerger, &fRow); + rowIndex += 1; } - tRowMergerGetRow(&merger, pRow); - tRowMergerClear(&merger); + + pDumpInfo->rowIndex = rowIndex; } - } else { + } else { // last row of current block, check if current block is overlapped with neighbor block + pDumpInfo->rowIndex += 1; + bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); + if (overlap) { + // load next block + + } + } + return TSDB_CODE_SUCCESS; } int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { - STSchema* pSchema = NULL; // todo set the correct schema TSKEY mergeTs = TSKEY_INITIAL_VAL; SRowMerger merge = {0}; - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); + TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); TSDBKEY k = {.ts = TSKEY_INITIAL_VAL}; TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL}; @@ -2514,33 +2700,35 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ik = TSDBROW_KEY(piRow); if (ik.ts <= k.ts) { - tRowMergerInit(&merge, piRow, pSchema); - doLoadRowsOfIdenticalTs(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerInit(&merge, piRow, pReader->pSchema); + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); if (k.ts == mergeTs) { - doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); } tRowMergerGetRow(&merge, pTSRow); return TSDB_CODE_SUCCESS; } else { // k.ts < ik.ts - tRowMergerInit(&merge, pRow, pSchema); - doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); return TSDB_CODE_SUCCESS; } } if (pBlockScanInfo->memHasVal) { - tRowMergerInit(&merge, pRow, pSchema); - doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + k = TSDBROW_KEY(pRow); + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->imemHasVal) { - tRowMergerInit(&merge, piRow, pSchema); - doLoadRowsOfIdenticalTs(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + ik = TSDBROW_KEY(piRow); + tRowMergerInit(&merge, piRow, pReader->pSchema); + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); return TSDB_CODE_SUCCESS; } @@ -2548,22 +2736,33 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { - int32_t numOfRows = 0; - int32_t numOfCols = (int32_t)taosArrayGetSize(pReader->pResBlock->pDataBlock); +int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { + int32_t numOfRows = pBlock->info.rows; + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); + + SColVal colVal = {0}; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + tTSRowGetVal(pTSRow, pReader->pSchema, pColInfoData->info.colId, &colVal); + colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull); + } + + pBlock->info.rows += 1; + return TSDB_CODE_SUCCESS; +} + +int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; int64_t st = taosGetTimestampUs(); - - STSchema* pSchema = NULL; do { STSRow* pTSRow = NULL; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow); - // todo assign to ssdatablock + doAppendOneRow(pBlock, pReader, pTSRow); if (pBlockScanInfo->memHasVal) { - TSDBROW* pRow = tsdbTbDataIterGet(&pBlockScanInfo->iter); + TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iter); TSDBKEY k = TSDBROW_KEY(pRow); if (k.ts >= maxKey.ts) { break; @@ -2571,7 +2770,7 @@ int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKe } if (pBlockScanInfo->imemHasVal) { - TSDBROW* pRow = tsdbTbDataIterGet(&pBlockScanInfo->iiter); + TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iiter); TSDBKEY k = TSDBROW_KEY(pRow); if (k.ts >= maxKey.ts) { break; @@ -2583,13 +2782,15 @@ int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKe break; } - if (numOfRows >= capacity) { + if (pBlock->info.rows >= capacity) { break; } } while (1); - taosMemoryFreeClear(pSchema); - assert(numOfRows <= capacity); + ASSERT(pBlock->info.rows <= capacity); + pBlock->info.uid = pBlockScanInfo->uid; + + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int64_t elapsedTime = taosGetTimestampUs() - st; tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s", @@ -2810,7 +3011,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf return TSDB_CODE_SUCCESS; } - pReader->status.pTableMap = createDataBlockScanInfo(pReader, pCond->uidList, pCond->numOfTables); + pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pTableList->pData, taosArrayGetSize(pTableList->pTableList)); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(pReader); *ppReader = NULL; @@ -2908,19 +3109,44 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { int64_t stime = taosGetTimestampUs(); int64_t elapsedTime = stime; + SReaderStatus* pStatus = &pReader->status; if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { - if (pReader->status.loadFromFile) { + if (pStatus->loadFromFile) { bool exists = true; int32_t code = loadDataInFiles(pReader, &exists); - } else { // no data in files, let's try in-memory buffer + } else { // no data in files, let's try the buffer + while(1) { + if (pStatus->pTableIter == NULL) { + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); + if (pStatus->pTableIter == NULL) { + return false; + } + } + + STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; + initMemIterator(pBlockScanInfo, pReader); + + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion}; + buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); + if (pReader->pResBlock->info.rows > 0) { + return true; + } + // current table is exhausted, let's try the next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, &pStatus->pTableIter); + if (pStatus->pTableIter == NULL) { + return false; + } + } } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) { + } else { + ASSERT(0); } // if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { // return loadDataBlockFromTableSeq(pReader); @@ -2957,32 +3183,10 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { - // SQueryFilePos* cur = &pReader->cur; - - // uint64_t uid = 0; - - // // there are data in file - // if (pReader->cur.fid != INT32_MIN) { - // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot]; - // uid = pBlockInfo->pTableCheckInfo->tableId; - // } else { - // STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex); - // uid = pCheckInfo->tableId; - // } - - // tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, - // cur->rows, - // cur->win.skey, cur->win.ekey, pReader->idStr); - - // pDataBlockInfo->uid = uid; - - // #if 0 - // // for multi-group data query processing test purpose - // pDataBlockInfo->groupId = uid; - // #endif - - // pDataBlockInfo->rows = cur->rows; - // pDataBlockInfo->window = cur->win; + ASSERT(pDataBlockInfo != NULL && pReader != NULL); + pDataBlockInfo->rows = pReader->pResBlock->info.rows; + pDataBlockInfo->uid = pReader->pResBlock->info.uid; + pDataBlockInfo->window = pReader->pResBlock->info.window; } int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { @@ -3061,6 +3265,17 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** } SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { + if (pReader->status.composedDataBlock) { + return pReader->pResBlock->pDataBlock; + } else { + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + + SBlockData data = {0}; + doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &data); + + // todo convert blockData to ssdatablock + } // /** // * In the following two cases, the data has been loaded to SColumnInfoData. // * 1. data is from cache, 2. data block is not completed qualified to query time range