提交 c9ab455b 编写于 作者: H Hongze Cheng

more work

上级 38b7cfe1
......@@ -614,8 +614,8 @@ _err:
return code;
}
static int32_t tsdbMergeBlockAndMem(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock,
TSDBKEY toKey /*not included*/, int8_t toDataOnly) {
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock,
TSDBKEY toKey /*not included*/, int8_t toDataOnly) {
int32_t code = 0;
SBlockData *pBlockDataFrom = &pCommitter->oBlockData;
SBlockData *pBlockDataTo = &pCommitter->nBlockData;
......@@ -692,7 +692,7 @@ _err:
return code;
}
static int32_t tsdbCommitMemoryDataImpl(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0;
TSDBROW *pRow;
SBlockData *pBlockData = &pCommitter->nBlockData;
......@@ -729,16 +729,22 @@ _err:
return code;
}
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock) {
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
int32_t code = 0;
if (pBlock->last) {
// TODO
code = tsdbReadBlockData(pCommitter->pReader, NULL, pBlock, &pCommitter->oBlockData, NULL, NULL);
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
if (code) goto _err;
code =
tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, NULL, NULL, pCommitter->cmprAlg);
tBlockReset(&pCommitter->nBlock);
pCommitter->nBlock.minKey = pBlock->minKey;
pCommitter->nBlock.maxKey = pBlock->maxKey;
pCommitter->nBlock.minVersion = pBlock->minVersion;
pCommitter->nBlock.nRow = pBlock->nRow;
pCommitter->nBlock.last = pBlock->last;
pCommitter->nBlock.hasDup = pBlock->hasDup;
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
......@@ -792,8 +798,8 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
if (pBlock->last) {
// merge memory data and disk data to write to .data/.last (todo)
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -803,13 +809,13 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
if (c < 0) {
// commit memory data until pBlock->minKey (not included) only to .data file (todo)
code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1);
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
} else if (c > 0) {
// just move the block (todo)
code = tsdbCommitTableDiskData(pCommitter, pBlock);
// code = tsdbCommitTableDiskData(pCommitter, pBlock);
if (code) goto _err;
iBlock++;
......@@ -821,12 +827,12 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
} else {
if (iBlock == nBlock - 1) {
// merge memory data and disk data to .data/.last file
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// merge memory data and disk data to .data file only until pBlock[1].
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
}
}
......@@ -835,15 +841,15 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
}
}
} else if (pBlock) {
code = tsdbCommitTableDiskData(pCommitter, pBlock);
// code = tsdbCommitTableDiskData(pCommitter, pBlock);
if (code) goto _err;
iBlock++;
// next block
} else {
// commit only memory data until (pCommitter->maxKey, VERSION_MAX)
code = tsdbCommitMemoryDataImpl(pCommitter, pIter,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -851,8 +857,8 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
}
// end =====================
// // SBlock
// code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx);
// SBlock
// code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
// if (code) goto _err;
// // SBlockIdx
......@@ -871,16 +877,37 @@ _err:
return code;
}
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
int32_t code = 0;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid};
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
TSDBROW *pRow;
int32_t iBlock = 0;
int32_t nBlock;
int64_t suid;
int64_t uid;
if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
suid = pTbData->suid;
uid = pTbData->uid;
} else {
pIter = NULL;
pRow = NULL;
......@@ -892,6 +919,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
nBlock = pCommitter->oBlockMap.nItem;
ASSERT(nBlock > 0);
suid = pBlockIdx->suid;
uid = pBlockIdx->uid;
} else {
nBlock = 0;
}
......@@ -900,17 +929,23 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
SBlock *pBlock = NULL; // (todo)
SBlock *pBlock = &pCommitter->oBlock;
int32_t c;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
// merge ===========
while (true) {
if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) {
if (pBlock->last) {
code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -918,12 +953,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
} else {
c = tBlockCmprFn(&(SBlock){}, pBlock);
if (c > 0) {
code = tsdbCommitTableDiskData(pCommitter, pBlock);
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
iBlock++;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else if (c < 0) {
code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1);
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -933,25 +974,30 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// add as a subblock
} else {
if (iBlock == nBlock - 1) {
code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1);
// code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1);
if (code) goto _err;
}
}
}
}
} else if (pBlock) {
code = tsdbCommitTableDiskData(pCommitter, pBlock);
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
// move to next block (todo)
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else {
code = tsdbCommitMemoryDataImpl(pCommitter, pIter,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -959,11 +1005,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
}
// end
// code = tsdbWriteBlock();
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdxMap, NULL, tPutBlockIdx);
// end =====================
code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
if (code) goto _err;
_exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册