提交 f41b47e1 编写于 作者: H Hongze Cheng

refactor more code

上级 62386223
......@@ -609,6 +609,8 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)]))
int tsdbAllocBuf(void** ppBuf, int size);
#ifdef __cplusplus
}
#endif
......
......@@ -636,200 +636,6 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return 0;
}
// static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
// STsdbCfg * pCfg = &(pHelper->pRepo->config);
// STable * pTable = pCommitIter->pTable;
// SBlockIdx * pIdx = &(pHelper->curCompIdx);
// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
// SBlock compBlock = {0};
// ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey);
// if (pIdx->hasLast) { // append to with last block
// ASSERT(pIdx->len > 0);
// SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
// ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
// tdResetDataCols(pDataCols);
// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
// pDataCols, NULL, 0);
// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
// if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
// if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
// } else {
// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
// if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows);
// if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
// if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
// }
// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
// } else {
// ASSERT(!pHelper->hasOldLastBlock);
// tdResetDataCols(pDataCols);
// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
// if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
// }
// #ifndef NDEBUG
// TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter);
// ASSERT(keyNext < 0 || keyNext > pIdx->maxKey);
// #endif
// return 0;
// }
// static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
// int *blkIdx) {
// STsdbCfg * pCfg = &(pHelper->pRepo->config);
// STable * pTable = pCommitIter->pTable;
// SBlockIdx * pIdx = &(pHelper->curCompIdx);
// SBlock compBlock = {0};
// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
// SDataCols *pDataCols0 = pHelper->pDataCols[0];
// SSkipListIterator slIter = {0};
// ASSERT(keyFirst <= pIdx->maxKey);
// SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
// pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE);
// ASSERT(pCompBlock != NULL);
// int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock));
// if (pCompBlock->last) {
// ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1);
// int16_t colId = 0;
// slIter = *(pCommitIter->pIter);
// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
// ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
// int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
// int rows2 =
// tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
// if (rows2 == 0) { // all data filtered out
// *(pCommitIter->pIter) = slIter;
// } else {
// if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock &&
// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
// tdResetDataCols(pDataCols);
// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
// pDataCols0->cols[0].pData, pDataCols0->numOfRows);
// ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
// tblkIdx++;
// } else {
// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
// int round = 0;
// int dIter = 0;
// while (true) {
// tdResetDataCols(pDataCols);
// int rowsRead =
// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
// if (rowsRead == 0) break;
// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
// if (round == 0) {
// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// } else {
// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// }
// tblkIdx++;
// round++;
// }
// }
// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
// }
// } else {
// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
// TSKEY blkKeyFirst = pCompBlock->keyFirst;
// TSKEY blkKeyLast = pCompBlock->keyLast;
// if (keyFirst < blkKeyFirst) {
// while (true) {
// tdResetDataCols(pDataCols);
// int rowsRead =
// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
// if (rowsRead == 0) break;
// ASSERT(rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// tblkIdx++;
// }
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// ASSERT(keyFirst <= blkKeyLast);
// int16_t colId = 0;
// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
// slIter = *(pCommitIter->pIter);
// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
// pDataCols0->numOfRows);
// if (rows2 == 0) { // all filtered out
// *(pCommitIter->pIter) = slIter;
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
// int rows = (rows1 >= rows3) ? rows3 : rows2;
// tdResetDataCols(pDataCols);
// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
// pDataCols0->cols[0].pData, pDataCols0->numOfRows);
// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
// return -1;
// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
// tblkIdx++;
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
// int round = 0;
// int dIter = 0;
// while (true) {
// int rowsRead =
// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
// if (rowsRead == 0) break;
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0)
// return -1;
// if (round == 0) {
// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// } else {
// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// }
// round++;
// tblkIdx++;
// }
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// }
// }
// }
// }
// *blkIdx = tblkIdx;
// return 0;
// }
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) {
int numOfRows = 0;
......@@ -1123,7 +929,7 @@ static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) {
}
static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) {
// TODO
SFile *pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD);
return 0;
}
......@@ -1221,62 +1027,66 @@ static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) {
return 0;
}
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock,
bool isLast, bool isSuperBlock) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer);
static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuperBlock) {
SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
SBlockData * pBlockData = NULL;
int nColsNotAllNull = 0;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_END));
ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
offset = pFile->info.size;
int32_t bsize = TSDB_BLOCK_DATA_LEN(0); // total block size
int32_t csize = 0; // SBlockCol part size
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull;
SDataCol * pDataCol = pDataCols->cols + ncol;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pCompCol, 0, sizeof(*pCompCol));
nColsNotAllNull++;
bsize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull);
if (tsdbAllocBuf(&(pReadH->pBuf), bsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pBlockData = (SBlockData *)(pReadH->pBuf);
SBlockCol *pBlockCol = pBlockData->cols + (nColsNotAllNull - 1);
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
memset(pBlockCol, 0, sizeof(*pBlockCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull));
}
nColsNotAllNull++;
}
csize = bsize;
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary
int tcol = 0;
int32_t toffset = 0;
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
int32_t lsize = tsize;
int32_t keyLen = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + tcol;
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
void *tptr = POINTER_SHIFT(pCompData, lsize);
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
void *tptr = POINTER_SHIFT(pBlockData, lsize);
int32_t flen = 0; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
......@@ -1290,9 +1100,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
}
}
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
(int32_t)taosTSizeof(pHelper->pBuffer) - lsize, pCfg->compression,
pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer));
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize,
pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer));
} else {
flen = tlen;
memcpy(tptr, pDataCol->pData, flen);
......@@ -1306,8 +1116,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) {
pCompCol->offset = toffset;
pCompCol->len = flen;
pBlockCol->offset = toffset;
pBlockCol->len = flen;
tcol++;
} else {
keyLen = flen;
......@@ -1317,44 +1127,41 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
lsize += flen;
}
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
pBlockData->delimiter = TSDB_FILE_DELIMITER;
pBlockData->uid = pHelper->tableInfo.uid;
pBlockData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)),
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)),
sizeof(TSCKSUM));
// Write the whole block to file
if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
if (taosTWrite(pFile->fd, (void *)pBlockData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
return -1;
}
// Update pCompBlock membership vairables
pCompBlock->last = isLast;
pCompBlock->offset = offset;
pCompBlock->algorithm = pCfg->compression;
pCompBlock->numOfRows = rowsToWrite;
pCompBlock->len = lsize;
pCompBlock->keyLen = keyLen;
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pCompBlock->numOfCols = nColsNotAllNull;
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
pBlock->algorithm = pCfg->compression;
pBlock->numOfRows = rowsToWrite;
pBlock->len = lsize;
pBlock->keyLen = keyLen;
pBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset),
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
pCompBlock->keyLast);
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pBlock->offset),
(int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst,
pBlock->keyLast);
pFile->info.size += pCompBlock->len;
pFile->info.size += pBlock->len;
return 0;
_err:
return -1;
}
\ No newline at end of file
......@@ -519,7 +519,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
return 0;
}
static int tsdbAllocBuf(void **ppBuf, int size) {
int tsdbAllocBuf(void **ppBuf, int size) {
void *pBuf = *pBuf;
int tsize = taosTSizeof(pBuf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册