提交 61cd5a29 编写于 作者: H Hongze Cheng

refactor more code

上级 30c0aea7
...@@ -741,3 +741,428 @@ _err: ...@@ -741,3 +741,428 @@ _err:
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return -1; return -1;
} }
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SBlockIdx * pIdx = &(pHelper->curCompIdx);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SBlock compBlock = {0};
ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey);
if (pIdx->hasLast) { // append to with last block
ASSERT(pIdx->len > 0);
SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
}
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
} else {
ASSERT(!pHelper->hasOldLastBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
}
#ifndef NDEBUG
TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter);
ASSERT(keyNext < 0 || keyNext > pIdx->maxKey);
#endif
return 0;
}
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SBlockIdx * pIdx = &(pHelper->curCompIdx);
SBlock compBlock = {0};
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SDataCols *pDataCols0 = pHelper->pDataCols[0];
SSkipListIterator slIter = {0};
ASSERT(keyFirst <= pIdx->maxKey);
SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL);
int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock));
if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1);
int16_t colId = 0;
slIter = *(pCommitIter->pIter);
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 =
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter;
} else {
if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0;
int dIter = 0;
while (true) {
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
}
tblkIdx++;
round++;
}
}
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
}
} else {
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
TSKEY blkKeyFirst = pCompBlock->keyFirst;
TSKEY blkKeyLast = pCompBlock->keyLast;
if (keyFirst < blkKeyFirst) {
while (true) {
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++;
}
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0;
int dIter = 0;
while (true) {
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0)
return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
}
round++;
tblkIdx++;
}
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
}
}
}
}
*blkIdx = tblkIdx;
return 0;
}
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) {
int numOfRows = 0;
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
SDataRow row = tsdbNextIterRow(pCommitIter->pIter);
key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row);
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 <= key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
(*iter)++;
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
} else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendDataRowToDataCol(row, pSchema, pTarget);
tSkipListIterNext(pCommitIter->pIter);
}
numOfRows++;
if (numOfRows >= maxRows) break;
ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints);
}
return numOfRows;
}
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) {
STsdbCfg *pCfg = &(pHelper->pRepo->config);
SFile * pFile = NULL;
bool isLast = false;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pFile = helperDataF(pHelper);
} else {
isLast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper);
}
ASSERT(pFile->fd > 0);
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
return 0;
}
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room
if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err;
// Change the offset
for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
}
// Memmove if needed
int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx);
if (tsize > 0) {
ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo));
ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo));
memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)),
POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize);
}
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfBlocks++;
pIdx->len += sizeof(SBlock);
ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo));
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
if (pIdx->numOfBlocks > 1) {
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
}
tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
_err:
return -1;
}
static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, int rowsAdded) {
ASSERT(pCompBlock->numOfSubBlocks == 0);
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
size_t spaceNeeded =
(pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
// Add the sub-block
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len));
if (tsize > 0) {
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SBlock)),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
}
}
*(SBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;
pSCompBlock->numOfSubBlocks++;
ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
pSCompBlock->len += sizeof(SBlock);
pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
pIdx->len += sizeof(SBlock);
} else { // Need to create two sub-blocks
void *ptr = NULL;
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) {
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
break;
}
}
if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM));
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) {
memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2);
}
}
((SBlock *)ptr)[0] = *pSCompBlock;
((SBlock *)ptr)[0].numOfSubBlocks = 0;
((SBlock *)ptr)[1] = *pCompBlock;
pSCompBlock->numOfSubBlocks = 2;
pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
pSCompBlock->len = sizeof(SBlock) * 2;
pSCompBlock->keyFirst = MIN(((SBlock *)ptr)[0].keyFirst, ((SBlock *)ptr)[1].keyFirst);
pSCompBlock->keyLast = MAX(((SBlock *)ptr)[0].keyLast, ((SBlock *)ptr)[1].keyLast);
pIdx->len += (sizeof(SBlock) * 2);
}
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = (uint32_t)pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
tsdbDebug("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);
return 0;
_err:
return -1;
}
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
ASSERT(pCompBlock->numOfSubBlocks == 1);
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1);
// Delete the sub blocks it has
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len));
if (tsize > 0) {
memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset),
POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize);
}
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks);
}
pIdx->len -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks);
}
*pSCompBlock = *pCompBlock;
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
}
\ No newline at end of file
...@@ -174,13 +174,14 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { ...@@ -174,13 +174,14 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
} }
int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo) { int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo) {
// TODO ASSERT(pBlock->numOfSubBlocks >= 1);
SBlock *pSubBlock = pBlock; SBlock *pSubBlock = pBlock;
int nSubBlocks = pBlock->numOfSubBlocks; int nSubBlocks = pBlock->numOfSubBlocks;
if (nSubBlocks > 1) { if (nSubBlocks > 1) {
if (pBlockInfo == NULL) pBlockInfo = pReadH->pBlockInfo; if (pBlockInfo == NULL) pBlockInfo = pReadH->pBlockInfo;
pSubBlock = (SBlock *)POINTER_SHIFT((void *)pReadH->pBlockInfo, pBlock->offset); pSubBlock = (SBlock *)POINTER_SHIFT((void *)pBlockInfo, pBlock->offset);
} }
if (tsdbLoadBlockDataImpl(pReadH, pSubBlock, pReadH->pDataCols[0]) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadH, pSubBlock, pReadH->pDataCols[0]) < 0) return -1;
...@@ -198,7 +199,23 @@ int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInf ...@@ -198,7 +199,23 @@ int tsdbLoadBlockData(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInf
} }
int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo, int16_t *colIds, int numOfCols) { int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBlockInfo, int16_t *colIds, int numOfCols) {
// TODO ASSERT(pBlock->numOfSubBlocks >= 1);
SBlock *pSubBlock = pBlock;
int nSubBlock = pBlock->numOfSubBlocks;
if (nSubBlock > 1) {
if (pBlockInfo == NULL) pBlockInfo = pReadH->pBlockInfo;
pSubBlock = (SBlock *)POINTER_SHIFT((void *)pBlockInfo, pBlock->offset);
}
if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[0], colIds, numOfCols) < 0) return -1;
for (int i = 1; i < nSubBlock; i++) {
pSubBlock++;
if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1;
if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err;
}
return 0; return 0;
} }
...@@ -299,9 +316,83 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -299,9 +316,83 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
return 0; return 0;
} }
static int tsdbLoadBlockDataColsImpl() { static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
// TODO int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks <= 1);
ASSERT(colIds[0] == 0);
SFile *pFile =
pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
SBlockCol compCol = {0};
// If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
pDataCols->numOfRows = pCompBlock->numOfRows;
int dcol = 0;
int ccol = 0;
for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i];
SDataCol *pDataCol = NULL;
SBlockCol *pCompCol = NULL;
while (true) {
if (dcol >= pDataCols->numOfCols) {
pDataCol = NULL;
break;
}
pDataCol = &pDataCols->cols[dcol];
if (pDataCol->colId > colId) {
pDataCol = NULL;
break;
} else {
dcol++;
if (pDataCol->colId == colId) break;
}
}
if (pDataCol == NULL) continue;
ASSERT(pDataCol->colId == colId);
if (colId == 0) { // load the key row
compCol.colId = colId;
compCol.len = pCompBlock->keyLen;
compCol.type = pDataCol->type;
compCol.offset = TSDB_KEY_COL_OFFSET;
pCompCol = &compCol;
} else { // load non-key rows
while (true) {
if (ccol >= pCompBlock->numOfCols) {
pCompCol = NULL;
break;
}
pCompCol = &(pHelper->pCompData->cols[ccol]);
if (pCompCol->colId > colId) {
pCompCol = NULL;
break;
} else {
ccol++;
if (pCompCol->colId == colId) break;
}
}
if (pCompCol == NULL) {
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
continue;
}
ASSERT(pCompCol->colId == pDataCol->colId);
}
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
}
return 0; return 0;
_err:
return -1;
} }
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
...@@ -383,3 +474,44 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -383,3 +474,44 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
} }
return 0; return 0;
} }
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol,
SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pCompCol->colId);
int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompCol->len);
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tsize);
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset;
if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm,
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock,
pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
pCompCol->colId, offset);
return -1;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册