提交 758c9e63 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact

......@@ -121,6 +121,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph);
#define tBlockInit() ((SBlock){0})
void tBlockReset(SBlock *pBlock);
void tBlockClear(SBlock *pBlock);
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest);
int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
......@@ -164,6 +165,7 @@ void tsdbFree(uint8_t *pBuf);
#define tMapDataInit() ((SMapData){0})
void tMapDataReset(SMapData *pMapData);
void tMapDataClear(SMapData *pMapData);
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest);
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
......
......@@ -342,33 +342,6 @@ _err:
return code;
}
// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
// int32_t nRow = 0;
// TSDBROW *pRow;
// TSDBKEY key;
// int32_t c = 0;
// STbDataIter iter = *pIter;
// iter.pRow = NULL;
// while (true) {
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow == NULL) break;
// key = tsdbRowKey(pRow);
// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
// if (c == 0) {
// nRow++;
// } else if (c > 0) {
// break;
// } else {
// ASSERT(0);
// }
// }
// return nRow;
// }
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
......@@ -636,7 +609,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
// loop to merge
pRow1 = tsdbTbDataIterGet(pIter);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
......@@ -668,7 +641,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
pRow1 = NULL;
}
}
} else if (tsdbRowCmprFn(pRow1, pRow2) < 0) {
} else if (tsdbRowCmprFn(pRow1, pRow2) > 0) {
*pRow = *pRow2;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
......@@ -702,7 +675,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
}
}
code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema);
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err;
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
......@@ -754,8 +727,9 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
while (true) {
if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) {
if (pRow == NULL) {
if (pBlockData->nRow > 0) {
goto _write_block;
} else {
......@@ -783,6 +757,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
continue;
......@@ -979,11 +954,87 @@ _err:
return code;
}
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
int32_t nRow = 0;
TSDBROW *pRow;
TSDBKEY key;
int32_t c = 0;
STbDataIter iter = *pIter;
iter.pRow = NULL;
while (true) {
pRow = tsdbTbDataIterGet(pIter);
if (pRow == NULL) break;
key = TSDBROW_KEY(pRow);
c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
if (c == 0) {
nRow++;
} else if (c > 0) {
break;
} else {
ASSERT(0);
}
}
return nRow;
}
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
TSDBROW *pRow;
tBlockDataReset(pBlockData);
pRow = tsdbTbDataIterGet(pIter);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
while (true) {
if (pRow) break;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);
if (c == 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
} else if (c > 0) {
pRow = NULL;
} else {
ASSERT(0);
}
}
}
// write as a subblock
code = tBlockCopy(pBlock, &pCommitter->nBlock);
if (code) goto _err;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
TSDBROW *pRow;
int32_t iBlock = 0;
int32_t iBlock;
int32_t nBlock;
int64_t suid;
int64_t uid;
......@@ -991,6 +1042,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
suid = pTbData->suid;
uid = pTbData->uid;
} else {
......@@ -1004,18 +1057,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
nBlock = pCommitter->oBlockMap.nItem;
ASSERT(nBlock > 0);
suid = pBlockIdx->suid;
uid = pBlockIdx->uid;
} else {
nBlock = 0;
}
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && nBlock == 0) goto _exit;
if (pRow == NULL && nBlock == 0) goto _exit;
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
SBlock *pBlock = &pCommitter->oBlock;
iBlock = 0;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
......@@ -1024,24 +1079,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// merge ===========
while (true) {
if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
if (pRow == NULL && pBlock == NULL) break;
if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) {
if (pRow && pBlock) {
if (pBlock->last) {
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
ASSERT(pRow == NULL && pBlock == NULL);
} else {
int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
if (c > 0) {
// only disk data
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
......@@ -1052,24 +1111,43 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
pBlock = NULL;
}
} else if (c < 0) {
// only memory data
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
} else {
int64_t nOvlp = 0; // (todo)
// merge memory and disk
int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
ASSERT(nOvlp);
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
// add as a subblock
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
} else {
if (iBlock == nBlock - 1) {
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1);
if (code) goto _err;
TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
int8_t toDataOnly = 0;
if (iBlock < nBlock - 1) {
toDataOnly = 1;
SBlock nextBlock = {0};
tBlockReset(&nextBlock);
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
toKey = nextBlock.minKey;
}
code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
if (code) goto _err;
}
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
}
......@@ -1089,7 +1167,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
ASSERT(pRow == NULL);
}
}
......
......@@ -35,6 +35,39 @@ void tMapDataClear(SMapData *pMapData) {
}
}
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) {
int32_t code = 0;
int32_t size;
pMapDataDest->nItem = pMapDataSrc->nItem;
pMapDataDest->flag = pMapDataSrc->flag;
switch (pMapDataDest->flag) {
case TSDB_OFFSET_I32:
size = sizeof(int32_t) * pMapDataDest->nItem;
break;
case TSDB_OFFSET_I16:
size = sizeof(int16_t) * pMapDataDest->nItem;
break;
case TSDB_OFFSET_I8:
size = sizeof(int8_t) * pMapDataDest->nItem;
break;
default:
ASSERT(0);
}
code = tsdbRealloc(&pMapDataDest->pOfst, size);
if (code) goto _exit;
memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size);
pMapDataDest->nData = pMapDataSrc->nData;
code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData);
if (code) goto _exit;
memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData);
_exit:
return code;
}
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
int32_t code = 0;
int32_t offset = pMapData->nData;
......@@ -369,6 +402,32 @@ void tBlockClear(SBlock *pBlock) {
}
}
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) {
int32_t code = 0;
pBlockDest->minKey = pBlockSrc->minKey;
pBlockDest->maxKey = pBlockSrc->maxKey;
pBlockDest->minVersion = pBlockSrc->minVersion;
pBlockDest->maxVersion = pBlockSrc->maxVersion;
pBlockDest->nRow = pBlockSrc->nRow;
pBlockDest->last = pBlockSrc->last;
pBlockDest->hasDup = pBlockSrc->hasDup;
pBlockDest->nSubBlock = pBlockSrc->nSubBlock;
for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) {
pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow;
pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg;
pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset;
pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize;
pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize;
pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize;
code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol);
if (code) goto _exit;
}
_exit:
return code;
}
int32_t tPutBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册