提交 eb45b021 编写于 作者: H Hongze Cheng

finish more code

上级 482476e4
......@@ -25,35 +25,41 @@
#define TSDB_KEY_COL_OFFSET 0
#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock))
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int tsdbInitHelperFile(SRWHelper *pHelper);
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
static void tsdbResetHelperTableImpl(SRWHelper *pHelper);
static void tsdbResetHelperTable(SRWHelper *pHelper);
static void tsdbInitHelperTable(SRWHelper *pHelper);
static void tsdbDestroyHelperTable(SRWHelper *pHelper);
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void tsdbResetHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int tsdbInitHelperFile(SRWHelper *pHelper);
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
static void tsdbResetHelperTableImpl(SRWHelper *pHelper);
static void tsdbResetHelperTable(SRWHelper *pHelper);
static void tsdbInitHelperTable(SRWHelper *pHelper);
static void tsdbDestroyHelperTable(SRWHelper *pHelper);
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void tsdbResetHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey);
static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
SDataCol *pDataCol);
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock);
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx);
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
......@@ -236,12 +242,7 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]);
STsdbCfg * pCfg = &(pHelper->pRepo->config);
int blkIdx = 0;
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SCompBlock compBlock = {0};
SSkipListIterator sIter = {0};
ASSERT(TABLE_TID(pCommitIter->pTable) == pHelper->tableInfo.tid);
if (TABLE_UID(pCommitIter->pTable) != pIdx->uid) memset((void *)pIdx, 0, sizeof(*pIdx));
......@@ -252,166 +253,10 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols
if (keyFirst < 0 || keyFirst > maxKey) break; // iter over
if (pIdx->offset <= 0 || keyFirst > pIdx->maxKey) {
if (pIdx->hasLast) {
ASSERT(pIdx->offset > 0);
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey,
defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, NULL, 0);
ASSERT((rowsRead > 0) && (rowsRead == pDataCols->numOfRows));
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks-1, rowsRead) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + rowsRead);
SFile *pFile = NULL;
bool isLast = false;
if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
pFile = &(pHelper->files.dataF);
} else {
isLast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], &compBlock,
isLast, true) < 0)
return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
}
} else { // last block is not in .last file
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
SFile *pFile = NULL;
bool isLast = false;
if (rowsRead >= pCfg->minRowsPerFileBlock) {
pFile = &(pHelper->files.dataF);
} else {
isLast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
}
if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1;
blkIdx = pIdx->numOfBlocks;
} else {
SCompBlock *pCompBlock = taosbsearch(&keyFirst, (void *)blockAtIdx(pHelper, blkIdx), pIdx->numOfBlocks - blkIdx,
sizeof(SCompBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL);
if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
int16_t colId = 0;
sIter = *(pCommitIter->pIter);
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, maxKey, rows1, NULL,
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
if (rows2 == 0) { // all data are filtered
*pCommitIter->pIter = sIter;
} else {
if (rows2 + rows1 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS &&
!TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols);
int rows = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
ASSERT(rows == rows2 && pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rows) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
// TODO: merge pHelper->pDataCols[0] with pCommitIter->pIter
int round = 0;
// int iter1 = 0;
while (true) {
tdResetDataCols(pDataCols);
int rowsRead = 0;
// tsdbTwoLeveIterMerge(pHelper->pDataCols[0], &iter1, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols);
if (rowsRead == 0) break;
SFile *pFile = NULL;
bool isLast = false;
if (rowsRead >= pCfg->minRowsPerFileBlock) {
pFile = &(pHelper->files.dataF);
} else {
isLast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1;
}
round++;
}
}
}
} else {
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : pCompBlock[1].keyFirst - 1;
if (keyFirst < pCompBlock->keyFirst) {
while (true) {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++;
}
} else {
ASSERT(keyFirst <= pCompBlock->keyLast);
int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
sIter = *(pCommitIter->pIter);
int rows1 = pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows;
int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, pCompBlock->keyLast, INT_MAX /*TODO*/, NULL,
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
int rows3 = 0;
if (rows2 == 0) { // All filtered out
*(pCommitIter->pIter) = sIter;
} else {
rows3 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, keyLimit, INT_MAX /* TODO*/, NULL, NULL, 0) + rows2;
ASSERT(rows3 >= rows2);
if (rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
int round = 0;
while (true) {
// TODO
round++;
}
}
}
}
}
if (tsdbProcessMergeCommit(pHelper, pCommitIter, pDataCols, maxKey, &blkIdx) < 0) return -1;
}
}
......@@ -1450,3 +1295,233 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
return buf;
}
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SCompBlock compBlock = {0};
ASSERT(pIdx->offset <= 0 || keyFirst > pIdx->maxKey);
if (pIdx->hasLast) { // append to with last block
ASSERT(pIdx->offset > 0);
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
}
} else {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
}
return 0;
}
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
SCompBlock compBlock = {0};
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SDataCols *pDataCols0 = pHelper->pDataCols[0];
SSkipListIterator slIter = {0};
ASSERT(keyFirst <= pIdx->maxKey);
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL);
if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
int16_t colId = 0;
slIter = *(pCommitIter->pIter);
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter;
} else {
if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0;
int dIter = 0;
while (true) {
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
}
round++;
}
}
}
*blkIdx = pIdx->numOfBlocks;
} else {
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
if (keyFirst < pCompBlock->keyFirst) {
while (true) {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++;
}
*blkIdx = tblkIdx;
} else {
ASSERT(keyFirst <= pCompBlock->keyLast);
int16_t colId =0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter;
} else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
ASSERT(rows3 >= rows2);
if (rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
*blkIdx = tblkIdx + 1;
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0;
int dIter = 0;
while (true) {
int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, 0 /*TODO*/) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, 0 /*TODO */) < 0) return -1;
}
round++;
}
}
}
}
}
return 0;
}
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) {
int numOfRows = 0;
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
SDataRow row = tsdbNextIterRow(pCommitIter->pIter);
key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row);
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 <= key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
(*iter)++;
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
} else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
ASSERT(pSchema != NULL);
tdAppendDataRowToDataCol(row, pSchema, pTarget);
}
tSkipListIterNext(pCommitIter->pIter);
}
numOfRows++;
if (numOfRows >= maxRows) break;
ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints);
}
return numOfRows;
}
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) {
STsdbCfg *pCfg = &(pHelper->pRepo->config);
SFile * pFile = NULL;
bool isLast = false;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pFile = &(pHelper->files.dataF);
} else {
isLast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
}
ASSERT(pFile->fd > 0);
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册