diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 419d63b6d5c4480e75352bf3941f1f0ab676be59..aaac9247a08b6158d81c1fe3abd9fc499dd18ea0 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -740,4 +740,429 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { _err: taosRUnLockLatch(&(pIter->pTable->latch)); return -1; +} + +static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SBlockIdx * pIdx = &(pHelper->curCompIdx); + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SBlock compBlock = {0}; + + ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); + if (pIdx->hasLast) { // append to with last block + ASSERT(pIdx->len > 0); + SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); + ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, + pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + } + + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } else { + ASSERT(!pHelper->hasOldLastBlock); + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + } + +#ifndef NDEBUG + TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); + ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); +#endif + + return 0; +} + +static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, + int *blkIdx) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SBlockIdx * pIdx = &(pHelper->curCompIdx); + SBlock compBlock = {0}; + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SDataCols *pDataCols0 = pHelper->pDataCols[0]; + + SSkipListIterator slIter = {0}; + + ASSERT(keyFirst <= pIdx->maxKey); + + SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), + pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); + ASSERT(pCompBlock != NULL); + int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); + + if (pCompBlock->last) { + ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); + int16_t colId = 0; + slIter = *(pCommitIter->pIter); + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); + + int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; + int rows2 = + tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + if (rows2 == 0) { // all data filtered out + *(pCommitIter->pIter) = slIter; + } else { + if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + tblkIdx++; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } + + tblkIdx++; + round++; + } + } + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } + } else { + TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); + TSKEY blkKeyFirst = pCompBlock->keyFirst; + TSKEY blkKeyLast = pCompBlock->keyLast; + + if (keyFirst < blkKeyFirst) { + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); + if (rowsRead == 0) break; + + ASSERT(rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + tblkIdx++; + } + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + ASSERT(keyFirst <= blkKeyLast); + int16_t colId = 0; + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + + slIter = *(pCommitIter->pIter); + int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, + pDataCols0->numOfRows); + + if (rows2 == 0) { // all filtered out + *(pCommitIter->pIter) = slIter; + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + + if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { + int rows = (rows1 >= rows3) ? rows3 : rows2; + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) + return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + tblkIdx++; + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) + return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } + + round++; + tblkIdx++; + } + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } + } + } + } + + *blkIdx = tblkIdx; + return 0; +} + +static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows) { + int numOfRows = 0; + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; + + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); + + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + SDataRow row = tsdbNextIterRow(pCommitIter->pIter); + key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row); + + if (key1 == INT64_MAX && key2 == INT64_MAX) break; + + if (key1 <= key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + pTarget->numOfRows++; + (*iter)++; + if (key1 == key2) tSkipListIterNext(pCommitIter->pIter); + } else { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendDataRowToDataCol(row, pSchema, pTarget); + tSkipListIterNext(pCommitIter->pIter); + } + + numOfRows++; + if (numOfRows >= maxRows) break; + ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints); + } + + return numOfRows; +} + +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) { + STsdbCfg *pCfg = &(pHelper->pRepo->config); + SFile * pFile = NULL; + bool isLast = false; + + ASSERT(pDataCols->numOfRows > 0); + + if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pFile = helperDataF(pHelper); + } else { + isLast = true; + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); + } + + ASSERT(pFile->fd > 0); + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; + + return 0; +} + +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { + SBlockIdx *pIdx = &(pHelper->curCompIdx); + + ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks); + ASSERT(pCompBlock->numOfSubBlocks == 1); + + // Adjust memory if no more room + if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err; + + // Change the offset + for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) { + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); + } + + // Memmove if needed + int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx); + if (tsize > 0) { + ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); + ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); + memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)), + POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize); + } + pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; + + pIdx->numOfBlocks++; + pIdx->len += sizeof(SBlock); + ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo)); + pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; + pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; + + if (pIdx->numOfBlocks > 1) { + ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); + } + + tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + blkIdx); + + return 0; + +_err: + return -1; +} + +static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, int rowsAdded) { + ASSERT(pCompBlock->numOfSubBlocks == 0); + + SBlockIdx *pIdx = &(pHelper->curCompIdx); + ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); + + SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); + + size_t spaceNeeded = + (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; + + pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + // Add the sub-block + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); + if (tsize > 0) { + memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SBlock)), + (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + + for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); + } + } + + *(SBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; + + pSCompBlock->numOfSubBlocks++; + ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); + pSCompBlock->len += sizeof(SBlock); + pSCompBlock->numOfRows += rowsAdded; + pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); + pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); + pIdx->len += sizeof(SBlock); + } else { // Need to create two sub-blocks + void *ptr = NULL; + for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { + SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) { + ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); + break; + } + } + + if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)); + + size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); + if (tsize > 0) { + memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize); + for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { + SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2); + } + } + + ((SBlock *)ptr)[0] = *pSCompBlock; + ((SBlock *)ptr)[0].numOfSubBlocks = 0; + + ((SBlock *)ptr)[1] = *pCompBlock; + + pSCompBlock->numOfSubBlocks = 2; + pSCompBlock->numOfRows += rowsAdded; + pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); + pSCompBlock->len = sizeof(SBlock) * 2; + pSCompBlock->keyFirst = MIN(((SBlock *)ptr)[0].keyFirst, ((SBlock *)ptr)[1].keyFirst); + pSCompBlock->keyLast = MAX(((SBlock *)ptr)[0].keyLast, ((SBlock *)ptr)[1].keyLast); + + pIdx->len += (sizeof(SBlock) * 2); + } + + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; + pIdx->hasLast = (uint32_t)pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; + + tsdbDebug("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); + + return 0; + +_err: + return -1; +} + +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { + ASSERT(pCompBlock->numOfSubBlocks == 1); + + SBlockIdx *pIdx = &(pHelper->curCompIdx); + + ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); + + SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(pSCompBlock->numOfSubBlocks >= 1); + + // Delete the sub blocks it has + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); + if (tsize > 0) { + memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset), + POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize); + } + + for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks); + } + + pIdx->len -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks); + } + + *pSCompBlock = *pCompBlock; + + pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; + pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; + + tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + blkIdx); + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index d78b4e3a0c22713725b6231f5aea00bfe1768b16..acf25d1fc2c9e2653f55ec0996ae666318ec8465 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -174,13 +174,14 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { } int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo) { - // TODO + ASSERT(pBlock->numOfSubBlocks >= 1); + SBlock *pSubBlock = pBlock; int nSubBlocks = pBlock->numOfSubBlocks; if (nSubBlocks > 1) { if (pBlockInfo == NULL) pBlockInfo = pReadH->pBlockInfo; - pSubBlock = (SBlock *)POINTER_SHIFT((void *)pReadH->pBlockInfo, pBlock->offset); + pSubBlock = (SBlock *)POINTER_SHIFT((void *)pBlockInfo, pBlock->offset); } if (tsdbLoadBlockDataImpl(pReadH, pSubBlock, pReadH->pDataCols[0]) < 0) return -1; @@ -198,7 +199,23 @@ int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInf } int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo, int16_t *colIds, int numOfCols) { - // TODO + ASSERT(pBlock->numOfSubBlocks >= 1); + + SBlock *pSubBlock = pBlock; + int nSubBlock = pBlock->numOfSubBlocks; + + if (nSubBlock > 1) { + if (pBlockInfo == NULL) pBlockInfo = pReadH->pBlockInfo; + pSubBlock = (SBlock *)POINTER_SHIFT((void *)pBlockInfo, pBlock->offset); + } + + if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[0], colIds, numOfCols) < 0) return -1; + for (int i = 1; i < nSubBlock; i++) { + pSubBlock++; + if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1; + if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err; + } + return 0; } @@ -299,9 +316,83 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols return 0; } -static int tsdbLoadBlockDataColsImpl() { - // TODO +static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds) { + ASSERT(pBlock->numOfSubBlocks <= 1); + ASSERT(colIds[0] == 0); + + SFile *pFile = + pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); + SBlockCol compCol = {0}; + + // If only load timestamp column, no need to load SBlockData part + if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; + + pDataCols->numOfRows = pCompBlock->numOfRows; + + int dcol = 0; + int ccol = 0; + for (int i = 0; i < numOfColIds; i++) { + int16_t colId = colIds[i]; + SDataCol *pDataCol = NULL; + SBlockCol *pCompCol = NULL; + + while (true) { + if (dcol >= pDataCols->numOfCols) { + pDataCol = NULL; + break; + } + pDataCol = &pDataCols->cols[dcol]; + if (pDataCol->colId > colId) { + pDataCol = NULL; + break; + } else { + dcol++; + if (pDataCol->colId == colId) break; + } + } + + if (pDataCol == NULL) continue; + ASSERT(pDataCol->colId == colId); + + if (colId == 0) { // load the key row + compCol.colId = colId; + compCol.len = pCompBlock->keyLen; + compCol.type = pDataCol->type; + compCol.offset = TSDB_KEY_COL_OFFSET; + pCompCol = &compCol; + } else { // load non-key rows + while (true) { + if (ccol >= pCompBlock->numOfCols) { + pCompCol = NULL; + break; + } + + pCompCol = &(pHelper->pCompData->cols[ccol]); + if (pCompCol->colId > colId) { + pCompCol = NULL; + break; + } else { + ccol++; + if (pCompCol->colId == colId) break; + } + } + + if (pCompCol == NULL) { + dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); + continue; + } + + ASSERT(pCompCol->colId == pDataCol->colId); + } + + if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; + } + return 0; + +_err: + return -1; } static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { @@ -381,5 +472,46 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 dataColSetOffset(pDataCol, numOfRows); } } + return 0; +} + +static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, + SDataCol *pDataCol) { + ASSERT(pDataCol->colId == pCompCol->colId); + int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; + pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompCol->len); + if (pHelper->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tsize); + if (pHelper->compBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset; + if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm, + pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock, + pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, + pCompCol->colId, offset); + return -1; + } + return 0; } \ No newline at end of file