diff --git a/include/common/tcommon.h b/include/common/tcommon.h index ffbd9bce4bd35e1cc9c699c7dd86f0852f6005fc..f668fcf4a990931b5b31c35af73f6f9eaf4a6579 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -119,8 +119,6 @@ typedef struct SQueryTableDataCond { STimeWindow* twindows; int64_t startVersion; int64_t endVersion; - int32_t numOfTables; // number of tables - uint64_t* uidList; // table uid list } SQueryTableDataCond; void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a91c60ea5c7edb84d11725e34ead42fbcfb3ae92..82d33a82a74c35c66e9604a88ce7045e6cea542f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -45,10 +45,6 @@ typedef struct STableBlockScanInfo { TSKEY lastKey; SBlockIdx blockIdx; SArray* pBlockList; // block data index list - // SBlockInfo* pCompInfo; -// int32_t compSize; -// int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks - uint8_t chosen : 2; // indicate which iterator should move forward bool iterInit; // whether to initialize the in-memory skip list iterator or not STbDataIter* iter; // mem buffer skip list iterator STbDataIter* iiter; // imem buffer skip list iterator @@ -79,6 +75,7 @@ typedef struct SIOCostSummary { typedef struct SBlockLoadSuppInfo { SColumnDataAgg* pstatis; SColumnDataAgg** plist; + int16_t* colIds; // column ids for loading file block data int32_t* slotIds; // colId to slotId char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; @@ -133,11 +130,11 @@ struct STsdbReader { SSDataBlock* pResBlock; int32_t capacity; SReaderStatus status; - char* idStr; // query info handle, for debug purpose - int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows + char* idStr; // query info handle, for debug purpose + int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; - SArray* prev; // previous row which is before than time window - SArray* next; // next row which is after the query time window + SArray* prev; // previous row which is before than time window + SArray* next; // next row which is after the query time window SIOCostSummary cost; STSchema* pSchema; @@ -180,27 +177,31 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t)); - if (pSupInfo->slotIds == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + size_t numOfCols = blockDataGetNumOfCols(pBlock); +// pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t)); +// if (pSupInfo->slotIds == NULL) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } + pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t)); pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES); - if (pSupInfo->buildBuf == NULL) { + if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) { + taosMemoryFree(pSupInfo->colIds); + taosMemoryFree(pSupInfo->buildBuf); return TSDB_CODE_OUT_OF_MEMORY; } - STSchema* pSchema = pReader->pSchema; +// STSchema* pSchema = pReader->pSchema; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + pSupInfo->colIds[i] = pCol->info.colId; - for (int32_t j = 0; j < pSchema->numOfCols; ++j) { - if (pCol->info.colId == pSchema->columns[j].colId) { - pSupInfo->slotIds[i] = j; - break; - } - } +// for (int32_t j = 0; j < pSchema->numOfCols; ++j) { +// if (pCol->info.colId == pSchema->columns[j].colId) { +// pSupInfo->slotIds[i] = j; +// break; +// } +// } if (IS_VAR_DATA_TYPE(pCol->info.type)) { pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes); @@ -402,7 +403,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->type = pCond->type; pReader->window = *pCond->twindows; - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); + if (pReader->suid != 0) { + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); + ASSERT(pReader->pSchema); + } // todo remove this setQueryTimewindow(pReader, pCond, 0); @@ -653,48 +657,6 @@ _end: // return hasNext; // } -// static bool hasMoreDataInCache(STsdbReader* pHandle) { -// STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb); -// size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); -// assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); -// pHandle->cur.fid = INT32_MIN; - -// STableBlockScanInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); -// if (!pCheckInfo->initBuf) { -// initTableMemIterator(pHandle, pCheckInfo); -// } - -// STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX); -// if (row == NULL) { -// return false; -// } - -// pCheckInfo->lastKey = TD_ROW_KEY(row); // first timestamp in buffer -// tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle, -// pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr); - -// // all data in mem are checked already. -// if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || -// (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) { -// return false; -// } - -// int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1; -// STimeWindow* win = &pHandle->cur.win; -// pHandle->cur.rows = buildInmemDataBlockImpl(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); - -// // update the last key value -// pCheckInfo->lastKey = win->ekey + step; -// pHandle->cur.lastKey = win->ekey + step; -// pHandle->cur.mixBlock = true; - -// if (!ASCENDING_TRAVERSE(pHandle->order)) { -// TSWAP(win->skey, win->ekey); -// } - -// return true; -// } - // static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) { // assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO); // if (key == TSKEY_INITIAL_VAL) { @@ -829,9 +791,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ } // 2. version range check -// if (block.minVersion > pReader->startVersion || block.maxVersion < pReader->endVersion) { -// continue; -// } + if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) { + continue; + } void* p = taosArrayPush(pScanInfo->pBlockList, &block); if (p == NULL) { @@ -877,16 +839,16 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SSDataBlock* pResBlock = pReader->pResBlock; + int32_t numOfCols = blockDataGetNumOfCols(pResBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; uint8_t *pb = NULL, *pb1 = NULL; - int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1); + int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1); if (code != TSDB_CODE_SUCCESS) { goto _error; } - int32_t numOfCols = blockDataGetNumOfCols(pResBlock); SColVal cv = {0}; for (int32_t i = 0; i < numOfCols; ++i) { @@ -2126,22 +2088,6 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { // return getDataBlock(pTsdbReadHandle, pBlockInfo, exists); // } -// static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) { -// assert(cur != NULL && numOfBlocks > 0); -// return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav); -// } - -// static void moveToNextDataBlockInCurrentFile(STsdbReader* pTsdbReadHandle) { -// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; - -// SQueryFilePos* cur = &pTsdbReadHandle->cur; -// assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0); - -// cur->slot += step; -// cur->mixBlock = false; -// cur->blockCompleted = false; -// } - // static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) { // return (numOfRows - startRow) / bucketRange; // } @@ -2184,26 +2130,22 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc } static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { - if (pBlockScanInfo->iter != NULL) { - pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter); - } else { - pBlockScanInfo->memHasVal = false; - } - - if (pBlockScanInfo->iiter != NULL) { - pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(pBlockScanInfo->iiter); - } else { - pBlockScanInfo->imemHasVal = false; - } - if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { return TSDB_CODE_SUCCESS; } + SSDataBlock* pBlock = pReader->pResBlock; + + int64_t st = taosGetTimestampUs(); int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader); - setComposedBlockFlag(pReader, true); - // set the correct block data info + int64_t elapsedTime = taosGetTimestampUs() - st; + + tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s", + pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pReader->idStr); + + pBlock->info.uid = pBlockScanInfo->uid; + setComposedBlockFlag(pReader, true); return code; } @@ -2384,15 +2326,17 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", - pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); - return code; - } else { + if (code == TSDB_CODE_SUCCESS) { + pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL); + tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 " %s", + "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey, pReader->idStr); + } else { + tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", + pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); + return code; } } } else { @@ -2404,15 +2348,17 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", - pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); - return code; - } else { + if (code == TSDB_CODE_SUCCESS) { + pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL); + tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 " %s", + "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey, pReader->idStr); + } else { + tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", + pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); + return code; } } } else { @@ -2558,6 +2504,24 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } } +static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { + int32_t numOfBlocks = 0; + int32_t code = moveToNextFile(pReader, &numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // all data files are consumed, try data in buffer + if (numOfBlocks == 0) { + pReader->status.loadFromFile = false; + return code; + } + + // initialize the block iterator for a new fileset + code = initBlockIterator(pReader, pBlockIter, numOfBlocks); + return code; +} + static int32_t buildBlockFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; @@ -2565,71 +2529,42 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { SFileSetIter* pFIter = &pStatus->fileIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter; + ASSERT (pFIter->index < pFIter->numOfFiles); - if (pFIter->index < pFIter->numOfFiles) { - if (pReader->status.blockIter.index == -1) { - int32_t numOfBlocks = 0; - code = moveToNextFile(pReader, &numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // initialize the block iterator for a new fileset - code = initBlockIterator(pReader, pBlockIter, numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (pReader->status.blockIter.index == -1) { + code = initForFirstBlockOfFile(pReader, pBlockIter); + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + return code; + } + + code = doBuildDataBlock(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + + // current block are exhausted, try the next file block + if (pDumpInfo->rowIndex >= pBlock->nRow) { + // try next data block in current file + bool hasNext = blockIteratorNext(&pReader->status.blockIter); + if (!hasNext) { // current file is exhausted, let's try the next file + code = initForFirstBlockOfFile(pReader, pBlockIter); + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + return code; + } } - + code = doBuildDataBlock(pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - - // current block are exhausted, try the next file block - if (pDumpInfo->rowIndex >= pBlock->nRow) { - bool hasNext = blockIteratorNext(&pReader->status.blockIter); - if (!hasNext) { // current file is exhausted, let's try the next file - int32_t numOfBlocks = 0; - code = moveToNextFile(pReader, &numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // all data files are consumed, try data in buffer - if (numOfBlocks == 0) { - pReader->status.loadFromFile = false; - return code; - } else { - // initialize the block iterator for a new fileset - code = initBlockIterator(pReader, pBlockIter, numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } else { // try next data block in current file - blockIteratorNext(pBlockIter); - code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } else { - code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); - return code; - } - - // repeat the previous procedure. - + code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); + return code; } } @@ -2759,6 +2694,17 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* return TSDB_CODE_SUCCESS; } +static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { + int32_t sversion = TSDBROW_SVERSION(pRow); + + if (pReader->pSchema == NULL) { + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + } else if (pReader->pSchema->version != sversion) { + taosMemoryFreeClear(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + } +} + int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { TSKEY mergeTs = TSKEY_INITIAL_VAL; @@ -2767,7 +2713,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); - TSDBKEY k = {.ts = TSKEY_INITIAL_VAL}; + TSDBKEY k = {.ts = TSKEY_INITIAL_VAL}; TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL}; if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { @@ -2775,6 +2721,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ik = TSDBROW_KEY(piRow); if (ik.ts <= k.ts) { + checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); + tRowMergerInit(&merge, piRow, pReader->pSchema); doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); @@ -2785,6 +2733,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR tRowMergerGetRow(&merge, pTSRow); return TSDB_CODE_SUCCESS; } else { // k.ts < ik.ts + checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader); + tRowMergerInit(&merge, pRow, pReader->pSchema); doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); @@ -2792,8 +2742,11 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR } } + if (pBlockScanInfo->memHasVal) { k = TSDBROW_KEY(pRow); + checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader); + tRowMergerInit(&merge, pRow, pReader->pSchema); doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); @@ -2802,6 +2755,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR if (pBlockScanInfo->imemHasVal) { ik = TSDBROW_KEY(piRow); + checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); + tRowMergerInit(&merge, piRow, pReader->pSchema); doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); @@ -2816,17 +2771,31 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + STSchema *pSchema = pReader->pSchema; SColVal colVal = {0}; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - int32_t slotId = pSupInfo->slotIds[i]; + int32_t i = 0, j = 0; - if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && slotId == 0) { - colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false); - } else { - tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal); - doCopyColVal(pColInfoData, i, numOfRows, &colVal, pSupInfo); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false); + i += 1; + } + + while (i < numOfCols && j < pSchema->numOfCols) { + pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + col_id_t colId = pColInfoData->info.colId; + + if (colId == pSchema->columns[j].colId) { + tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal); + doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo); + i += 1; + j += 1; + } else if (colId < pSchema->columns[j].colId) { + colDataAppendNULL(pColInfoData, numOfRows); + i += 1; + } else if (colId > pSchema->columns[j].colId) { + j += 1; } } @@ -2837,7 +2806,6 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; - int64_t st = taosGetTimestampUs(); do { STSRow* pTSRow = NULL; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow); @@ -2870,14 +2838,6 @@ int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY max } while (1); ASSERT(pBlock->info.rows <= capacity); - pBlock->info.uid = pBlockScanInfo->uid; - - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); - - int64_t elapsedTime = taosGetTimestampUs() - st; - tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s", - pReader, elapsedTime, pBlock->info.rows, numOfCols, pReader->idStr); - return TSDB_CODE_SUCCESS; } @@ -2976,46 +2936,6 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // return false; // } -// static bool loadCachedLastRow(STsdbReader* pTsdbReadHandle) { -// // the last row is cached in buffer, return it directly. -// // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER -// int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); -// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); -// assert(numOfTables > 0 && numOfCols > 0); - -// SQueryFilePos* cur = &pTsdbReadHandle->cur; - -// STSRow* pRow = NULL; -// TSKEY key = TSKEY_INITIAL_VAL; -// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; -// TSKEY lastRowKey = TSKEY_INITIAL_VAL; -// int32_t curRow = 0; - -// if (++pTsdbReadHandle->activeIndex < numOfTables) { -// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex); -// // int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); -// // if (ret != TSDB_CODE_SUCCESS) { -// // return false; -// // } -// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, -// pCheckInfo->tableId, NULL, NULL, true, &lastRowKey); -// taosMemoryFreeClear(pRow); - -// // update the last key value -// pCheckInfo->lastKey = key + step; - -// cur->rows = 1; // only one row -// cur->lastKey = key + step; -// cur->mixBlock = true; -// cur->win.skey = key; -// cur->win.ekey = key; - -// return true; -// } - -// return false; -// } - // static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) { // size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); // assert(numOfTables > 0); @@ -3117,7 +3037,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl return TSDB_CODE_SUCCESS; } - pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, taosArrayGetSize(pTableList)); + int32_t numOfTables = taosArrayGetSize(pTableList); + pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(pReader); *ppReader = NULL; @@ -3154,7 +3075,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl // } #endif - tsdbDebug("%p total numOfTable:%d in this query %s", pReader, pCond->numOfTables, pReader->idStr); + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; _err: @@ -3529,26 +3450,33 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa } int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { - int64_t rows = 0; - SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; + int64_t rows = 0; - // if (pMemTable == NULL) { - // return rows; - // } + SReaderStatus* pStatus = &pReader->status; + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); - // size_t size = taosArrayGetSize(pReader->pTableCheckInfo); - // for (int32_t i = 0; i < size; ++i) { - // STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); - - // // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { - // // pMem = pMemT->tData[pCheckInfo->tableId]; - // // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; - // // } - // // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { - // // pIMem = pIMemT->tData[pCheckInfo->tableId]; - // // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; - // // } - // } + while (pStatus->pTableIter != NULL) { + STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; + + STbData* d = NULL; + if (pReader->pTsdb->mem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + if (d != NULL) { + rows += tsdbGetNRowsInTbData(d); + } + } + + STbData* di = NULL; + if (pReader->pTsdb->imem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + if (di != NULL) { + rows += tsdbGetNRowsInTbData(di); + } + } + + // current table is exhausted, let's try the next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); + } return rows; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 12f6f3918acc768abb1d1ca6308fc952c5bc97de..08d10196eb575ef70e4e5ebadc2ecf66417e45a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -849,7 +849,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl uint8_t *pBuf1 = NULL; uint8_t *pBuf2 = NULL; - ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); + ASSERT(aColId[0] == PRIMARYKEY_TIMESTAMP_COL_ID); if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index 1636bb38c800350a2b39a26d5dcd9a25ab56dc23..e0c46ae5fe59f62cdf0c3a152a83fef7ceb76c02 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -1,9 +1,7 @@ -run tsim/user/pass_alter.sim -run tsim/user/basic1.sim -run tsim/user/privilege2.sim -run tsim/user/user_len.sim -run tsim/user/privilege1.sim -run tsim/user/pass_len.sim +run tsim/user/password.sim +run tsim/user/privilege_db.sim +run tsim/user/privilege_sysinfo.sim +run tsim/user/basic.sim run tsim/table/basic1.sim run tsim/trans/lossdata1.sim run tsim/trans/create_db.sim @@ -26,18 +24,23 @@ run tsim/stable/values.sim run tsim/stable/dnode3.sim run tsim/stable/alter_insert1.sim run tsim/stable/refcount.sim +run tsim/stable/tag_filter.sim run tsim/stable/disk.sim run tsim/db/basic1.sim run tsim/db/basic3.sim run tsim/db/basic7.sim run tsim/db/basic6.sim +run tsim/db/alter_replica_13.sim run tsim/db/create_all_options.sim run tsim/db/basic2.sim run tsim/db/error1.sim +run tsim/db/alter_replica_31.sim run tsim/db/taosdlog.sim run tsim/db/alter_option.sim run tsim/mnode/basic1.sim -#run tsim/mnode/basic3.sim +run tsim/mnode/basic4.sim +run tsim/mnode/basic3.sim +run tsim/mnode/basic5.sim run tsim/mnode/basic2.sim run tsim/parser/fourArithmetic-basic.sim run tsim/parser/groupby-basic.sim @@ -57,11 +60,12 @@ run tsim/query/complex_group.sim run tsim/query/interval.sim run tsim/query/session.sim run tsim/query/scalarFunction.sim -#run tsim/query/scalarNull.sim +run tsim/query/scalarNull.sim run tsim/query/complex_where.sim run tsim/tmq/basic1.sim run tsim/tmq/basic4.sim run tsim/tmq/basic1Of2Cons.sim +run tsim/tmq/snapshot.sim run tsim/tmq/prepareBasicEnv-1vgrp.sim run tsim/tmq/topic.sim run tsim/tmq/basic4Of2Cons.sim @@ -73,14 +77,36 @@ run tsim/tmq/basic3Of2Cons.sim run tsim/tmq/basic2Of2ConsOverlap.sim run tsim/tmq/clearConsume.sim run tsim/qnode/basic1.sim -run tsim/dnode/basic1.sim +run tsim/dnode/redistribute_vgroup_replica3_v3.sim +run tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim +run tsim/dnode/redistribute_vgroup_replica3_v2.sim +run tsim/dnode/drop_dnode_has_mnode.sim +run tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim +run tsim/dnode/drop_dnode_has_vnode_replica1.sim +run tsim/dnode/balance_replica3.sim +run tsim/dnode/redistribute_vgroup_replica1.sim +run tsim/dnode/drop_dnode_has_vnode_replica3.sim +run tsim/dnode/balance_replica1.sim +run tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim +run tsim/dnode/drop_dnode_has_qnode_snode.sim +run tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim +run tsim/dnode/create_dnode.sim +run tsim/testsuit.sim run tsim/show/basic.sim run tsim/stream/basic1.sim +run tsim/stream/windowClose.sim +run tsim/stream/partitionby1.sim run tsim/stream/triggerInterval0.sim run tsim/stream/triggerSession0.sim +run tsim/stream/distributeIntervalRetrive0.sim run tsim/stream/basic0.sim run tsim/stream/session0.sim +run tsim/stream/schedSnode.sim +run tsim/stream/partitionby.sim run tsim/stream/session1.sim +run tsim/stream/distributeInterval0.sim +run tsim/stream/distributeSession0.sim +run tsim/stream/state0.sim run tsim/stream/basic2.sim run tsim/insert/basic1.sim run tsim/insert/commit-merge0.sim @@ -88,15 +114,18 @@ run tsim/insert/basic0.sim run tsim/insert/update0.sim run tsim/insert/backquote.sim run tsim/insert/null.sim +run tsim/catalog/alterInCurrent.sim run tsim/sync/oneReplica1VgElectWithInsert.sim run tsim/sync/threeReplica1VgElect.sim run tsim/sync/oneReplica1VgElect.sim run tsim/sync/3Replica5VgElect.sim +run tsim/sync/3Replica5VgElect3mnodedrop.sim +run tsim/sync/3Replica5VgElect3mnode.sim run tsim/sync/insertDataByRunBack.sim run tsim/sync/oneReplica5VgElect.sim run tsim/sync/3Replica1VgElect.sim run tsim/sync/threeReplica1VgElectWihtInsert.sim -run tsim/sma/tsmaCreateInsertData.sim +run tsim/sma/tsmaCreateInsertQuery.sim run tsim/sma/rsmaCreateInsertQuery.sim run tsim/valgrind/basic.sim run tsim/valgrind/checkError.sim