提交 d35ca7c2 编写于 作者: C Cary Xu

code optimization

上级 44ad2279
......@@ -557,6 +557,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->compactState = 0;
pRepo->truncateState = 0;
pRepo->config = *pCfg;
if (pAppH) {
pRepo->appH = *pAppH;
......
......@@ -682,7 +682,6 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
uint32_t toffset = TSDB_KEY_COL_OFFSET;
int32_t tlen = pBlock->keyLen;
if (dcol != 0) {
tsdbGetSBlockCol(pBlock, &pBlockCol, pBlockData->cols, ccol);
tcolId = pBlockCol->colId;
......
......@@ -24,7 +24,7 @@ typedef struct {
typedef struct {
SRtn rtn;
SFSIter fsIter;
SArray * tbArray; // STableTruncateH, table array to cache table obj and block indexes
SArray * tblArray; // STableTruncateH, table array to cache table obj and block indexes
SReadH readh;
SDFileSet wSet;
SArray * aBlkIdx;
......@@ -51,8 +51,8 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitCompTbArray(STruncateH *pTruncateH);
static void tsdbDestroyCompTbArray(STruncateH *pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH);
static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH);
static int tsdbCacheFSetIndex(STruncateH *pTruncateH);
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
......@@ -250,6 +250,7 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) {
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
......@@ -285,7 +286,7 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
return -1;
}
if (tsdbInitCompTbArray(pTruncateH) < 0) {
if (tsdbInitTruncateTblArray(pTruncateH) < 0) {
tsdbDestroyTruncateH(pTruncateH);
return -1;
}
......@@ -318,33 +319,33 @@ static void tsdbDestroyTruncateH(STruncateH *pTruncateH) {
pTruncateH->pDataCols = tdFreeDataCols(pTruncateH->pDataCols);
pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx);
tsdbDestroyCompTbArray(pTruncateH);
tsdbDestroyTruncateTblArray(pTruncateH);
tsdbDestroyReadH(&(pTruncateH->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
}
static int tsdbInitCompTbArray(STruncateH *pTruncateH) { // Init pComp->tbArray
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
pTruncateH->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (pTruncateH->tbArray == NULL) {
pTruncateH->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (pTruncateH->tblArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; i++) {
for (int i = 0; i < pMeta->maxTables; ++i) {
STableTruncateH ch = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
ch.pTable = pMeta->tables[i];
}
if (taosArrayPush(pTruncateH->tbArray, &ch) == NULL) {
if (taosArrayPush(pTruncateH->tblArray, &ch) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
......@@ -355,22 +356,21 @@ static int tsdbInitCompTbArray(STruncateH *pTruncateH) { // Init pComp->tbArray
return 0;
}
static void tsdbDestroyCompTbArray(STruncateH *pTruncateH) {
STableTruncateH *pTh;
static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH) {
STableTruncateH *pTblHandle = NULL;
if (pTruncateH->tbArray == NULL) return;
if (pTruncateH->tblArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) {
pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i);
if (pTh->pTable) {
tsdbUnRefTable(pTh->pTable);
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tblArray); ++i) {
pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, i);
if (pTblHandle->pTable) {
tsdbUnRefTable(pTblHandle->pTable);
}
// pTh->pInfo = taosTZfree(pTh->pInfo);
tfree(pTh->pInfo);
tfree(pTblHandle->pInfo);
}
pTruncateH->tbArray = taosArrayDestroy(pTruncateH->tbArray);
pTruncateH->tblArray = taosArrayDestroy(pTruncateH->tblArray);
}
static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
......@@ -380,21 +380,22 @@ static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
return -1;
}
for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) {
STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid);
pTh->pBlkIdx = NULL;
size_t tblArraySize = taosArrayGetSize(pTruncateH->tblArray);
for (size_t tid = 1; tid < tblArraySize; ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
pTblHandle->pBlkIdx = NULL;
if (pTh->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
if (pTblHandle->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTblHandle->pTable) < 0) {
return -1;
}
if (pReadH->pBlkIdx == NULL) continue;
pTh->bIndex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bIndex);
pTblHandle->bIndex = *(pReadH->pBlkIdx);
pTblHandle->pBlkIdx = &(pTblHandle->bIndex);
uint32_t originLen = 0;
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) {
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTblHandle->pInfo)), &originLen) < 0) {
return -1;
}
}
......@@ -424,7 +425,7 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
SBlockIdx blkIdx;
SBlockIdx blkIdx = {0};
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
......@@ -432,51 +433,60 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
taosArrayClear(pTruncateH->aBlkIdx);
for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) {
STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid);
STSchema * pSchema;
for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
STSchema * pSchema = NULL;
if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue;
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
if ((pSchema = tsdbGetTableSchemaImpl(pTblHandle->pTable, true, true, -1, -1)) == NULL) {
return -1;
}
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1, -1);
taosArrayClear(pTruncateH->aSupBlk);
if ((tdInitDataCols(pTruncateH->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdFreeSchema(pSchema);
return -1;
}
tdFreeSchema(pSchema);
// Loop to truncate each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pTblHandle->pInfo->blocks + i;
// Load the block data
if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) {
return -1;
}
// Merge pTruncateH->pDataCols and pReadh->pDCols[0] and write data to file
if (pTruncateH->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
if ((pTruncateH->pDataCols->numOfRows == 0) && (pBlock->numOfRows >= defaultRows)) {
// if pTruncateH->pDataCols has no data and pBlock->numOfRows >= defaultRows, write data directly
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
while (true) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
// no data left to merge anymore
if ((pReadh->pDCols[0]->numOfRows - ridx) == 0) break;
// min(data left, target space left)
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pTruncateH->pDataCols->numOfRows);
tdMergeDataCols(pTruncateH->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx,
pCfg->update != TD_ROW_PARTIAL_UPDATE);
if (pTruncateH->pDataCols->numOfRows < defaultRows) {
// continue to read more blocks
break;
}
if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
tdResetDataCols(pTruncateH->pDataCols);
......@@ -485,16 +495,16 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
}
if (pTruncateH->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTh->pTable, pTruncateH->aSupBlk, NULL, ppBuf,
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, ppBuf,
&blkIdx) < 0) {
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(&blkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -511,9 +521,9 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
SBlock block;
SDFile * pDFile = NULL;
bool isLast = false;
SBlock block = {0};
ASSERT(pDataCols->numOfRows > 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册