提交 f9fd39eb 编写于 作者: H Hongze Cheng

more work

上级 d814016e
......@@ -91,6 +91,7 @@ typedef struct STsdbFSState STsdbFSState;
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tRowIterNext(SRowIter *pIter);
......
......@@ -369,208 +369,6 @@ _err:
// return nRow;
// }
// static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
// int8_t toDataOnly) {
// int32_t code = 0;
// int32_t iRow = 0;
// int32_t nRow = 0;
// int32_t c;
// TSDBROW *pRow;
// SBlock block = tBlockInit();
// TSDBKEY key1;
// TSDBKEY key2;
// tBlockDataReset(&pCommitter->nBlockData);
// // load last and merge until {pCommitter->maxKey, INT64_MAX}
// code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL);
// if (code) goto _err;
// iRow = 0;
// nRow = pCommitter->oBlockData.nRow;
// pRow = tsdbTbDataIterGet(pIter);
// while (true) {
// if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
// if (pCommitter->nBlockData.nRow > 0) {
// goto _write_block_data;
// } else {
// break;
// }
// }
// // TODO
// _write_block_data:
// block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0;
// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
// if (code) goto _err;
// code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
// if (code) goto _err;
// }
// tBlockReset(&block);
// tBlockDataReset(&pCommitter->nBlockData);
// return code;
// _err:
// tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
// int8_t isLastBlock) {
// int32_t code = 0;
// TSDBROW *pRow;
// TSDBKEY key;
// int32_t c;
// if (pBlock == NULL) { // (pIter && pBlock == NULL)
// key.ts = pCommitter->maxKey;
// key.version = INT64_MAX;
// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
// if (code) goto _err;
// } else if (pBlock->last) {
// // merge
// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
// if (code) goto _err;
// } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter)
// // memory
// if (pIter) {
// key.ts = pBlock->info.minKey.ts;
// key.version = pBlock->info.minKey.version - 1;
// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
// if (code) goto _err;
// }
// // merge or move block
// pRow = tsdbTbDataIterGet(pIter);
// key.ts = pRow->pTSRow->ts;
// key.version = pRow->version;
// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
// if (c > 0) {
// // move block
// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
// if (code) goto _err;
// } else if (c == 0) {
// int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);
// if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) {
// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1);
// if (code) goto _err;
// } else {
// // add as a subblock
// }
// } else {
// ASSERT(0);
// }
// }
// return code;
// _err:
// tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
// int32_t code = 0;
// STbDataIter iter;
// STbDataIter *pIter = &iter;
// TSDBROW *pRow;
// int64_t suid;
// int64_t uid;
// SBlockIdx blockIdx;
// // create iter
// if (pTbData) {
// suid = pTbData->suid;
// uid = pTbData->uid;
// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
// } else {
// suid = pBlockIdx->suid;
// uid = pBlockIdx->uid;
// pIter = NULL;
// }
// // check
// pRow = tsdbTbDataIterGet(pIter);
// if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
// // start ================================
// tMapDataReset(&pCommitter->oBlockMap);
// tBlockReset(&pCommitter->oBlock);
// tBlockDataReset(&pCommitter->oBlockData);
// if (pBlockIdx) {
// code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
// if (code) goto _err;
// }
// blockIdx = tBlockIdxInit(suid, uid);
// tMapDataReset(&pCommitter->nBlockMap);
// tBlockReset(&pCommitter->nBlock);
// tBlockDataReset(&pCommitter->nBlockData);
// // impl ===============================
// int32_t iBlock = 0;
// int32_t nBlock = pCommitter->oBlockMap.nItem;
// // merge
// pRow = tsdbTbDataIterGet(pIter);
// while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1));
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// }
// // mem
// pRow = tsdbTbDataIterGet(pIter);
// while (!ROW_END(pRow, pCommitter->maxKey)) {
// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// }
// // disk
// while (iBlock < nBlock) {
// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
// code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0);
// if (code) goto _err;
// iBlock++;
// }
// // end ===============================
// code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx);
// if (code) goto _err;
// code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx);
// if (code) goto _err;
// _exit:
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow) {
// ASSERT(pRow->pTSRow->ts > pCommitter->maxKey);
// if (pCommitter->nextKey > pRow->pTSRow->ts) {
// pCommitter->nextKey = pRow->pTSRow->ts;
// }
// }
// return code;
// _err:
// tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
// return code;
// }
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
......@@ -816,6 +614,127 @@ _err:
return code;
}
static int32_t tsdbMergeBlockAndMem(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock,
TSDBKEY toKey /*not included*/, int8_t toDataOnly) {
int32_t code = 0;
SBlockData *pBlockDataFrom = &pCommitter->oBlockData;
SBlockData *pBlockDataTo = &pCommitter->nBlockData;
TSDBROW *pRow1;
TSDBROW *pRow2;
int32_t c = 0;
// read SBlockData
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, pBlockDataFrom, NULL, NULL);
if (code) goto _err;
// loop to merge
tBlockDataReset(pBlockDataTo);
pRow1 = tsdbTbDataIterGet(pIter);
pRow2 = &tsdbRowFromBlockData(pBlockDataFrom, 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
while (true) {
if (pRow1 == NULL && pRow2 == NULL) {
if (pBlockDataTo->nRow == 0) {
break;
} else {
goto _write_block;
}
}
if (pRow1 && pRow2) {
c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
// TODO
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL);
if (code) goto _err;
pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL;
} else {
ASSERT(0);
}
} else if (pRow1) {
code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema);
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
// TODO
} else {
code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL);
if (code) goto _err;
pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL;
}
if (pBlockDataTo->nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block;
} else {
continue;
}
_write_block:
tBlockDataReset(pBlockDataTo);
// TODO
}
return code;
_err:
tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitMemoryDataImpl(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0;
TSDBROW *pRow;
SBlockData *pBlockData = &pCommitter->nBlockData;
pRow = tsdbTbDataIterGet(pIter);
tBlockDataReset(pBlockData);
while (true) {
if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) {
if (pBlockData->nRow > 0) {
goto _write_block;
} else {
break;
}
}
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
_write_block:
tBlockDataReset(pBlockData);
}
return code;
_err:
return code;
}
static int32_t tsdbCommitMoveDiskBlock(SCommitter *pCommitter, SBlock *pBlock) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
......@@ -825,34 +744,106 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow == tsdbTbDataIterGet(pIter);
if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
return tsdbCommitDiskData(pCommitter, oBlockIdx);
code = tsdbCommitDiskData(pCommitter, oBlockIdx);
if (code) {
goto _err;
} else {
goto _exit;
}
}
// start ==================
// read
code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
if (code) goto _err;
// loop to merge
SBlockData *pBlockData = &pCommitter->nBlockData;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->oBlockMap.nItem;
tBlockDataReset(pBlockData);
// SBlockData *pBlockData = &pCommitter->nBlockData;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->oBlockMap.nItem;
// SBlock *pBlockO = &pCommitter->oBlock;
SBlock *pBlock;
int32_t c;
// merge ===================
while (true) {
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && iBlock >= nBlock) break;
}
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
while (iBlock < nBlock) {
/* code */
}
if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
if (pBlock->last) {
// merge memory data and disk data to write to .data/.last (todo)
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
//
while (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) {
/* code */
pRow = tsdbTbDataIterGet(pIter);
iBlock++;
} else {
c = tBlockCmprFn(&(SBlock){}, pBlock);
if (c < 0) {
// commit memory data until pBlock->minKey (not included) only to .data file (todo)
code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
} else if (c > 0) {
// just move the block (todo)
code = tsdbCommitMoveDiskBlock(pCommitter, pBlock);
if (code) goto _err;
iBlock++;
// TODO
} else {
int64_t nOvlp = 0; // = tsdbOvlpRows();
if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
// add as a subblock
} else {
if (iBlock == nBlock - 1) {
// merge memory data and disk data to .data/.last file
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// merge memory data and disk data to .data file only until pBlock[1].
code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
}
}
pRow = tsdbTbDataIterGet(pIter);
iBlock++;
}
}
} else if (pBlock) {
code = tsdbCommitMoveDiskBlock(pCommitter, pBlock);
if (code) goto _err;
iBlock++;
// next block
} else {
// commit only memory data until (pCommitter->maxKey, VERSION_MAX)
code = tsdbCommitMemoryDataImpl(pCommitter, pIter,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
}
}
// end =====================
// // SBlock
// code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx);
// if (code) goto _err;
// // SBlockIdx
// code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
// if (code) goto _err;
_exit:
if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
}
return code;
_err:
......
......@@ -634,6 +634,10 @@ int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
return n;
}
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
}
// SRowIter ======================================================
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
pIter->pRow = pRow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册