提交 67db73b5 编写于 作者: H Hongze Cheng

more last refact work

上级 93ca2b1f
...@@ -140,9 +140,9 @@ void tBlockDataReset(SBlockData *pBlockData); ...@@ -140,9 +140,9 @@ void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClearData(SBlockData *pBlockData);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
......
...@@ -299,7 +299,9 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { ...@@ -299,7 +299,9 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; SBlockData *pBlockDatal = &pCommitter->dReader.bDatal;
pCommitter->dReader.iRow++; pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pBlockDatal->nRow) { if (pCommitter->dReader.iRow < pBlockDatal->nRow) {
if (pBlockDatal->uid == 0) { if (pBlockDatal->uid) {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid;
} else {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow];
} }
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow);
...@@ -327,6 +329,28 @@ _exit: ...@@ -327,6 +329,28 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx);
pCommitter->dReader.iBlockIdx++;
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _exit;
ASSERT(pCommitter->dReader.mBlock.nItem > 0);
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
_exit:
return code;
}
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
...@@ -469,37 +493,18 @@ _exit: ...@@ -469,37 +493,18 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx, static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
int8_t toDataOnly) {
int32_t code = 0;
if (pBlock->nSubBlock == 0) {
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
pBlock->last = 1;
} else {
pBlock->last = 0;
}
}
code =
tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
return code;
_err:
return code;
}
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SBlock block; SBlock block;
ASSERT(pCommitter->dWriter.bData.nRow > 0); ASSERT(pCommitter->dWriter.bData.nRow > 0);
if (pBlock) {
block = *pBlock; // as a subblock
} else {
tBlockReset(&block); // as a new block
}
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL, code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL,
pCommitter->cmprAlg); pCommitter->cmprAlg);
if (code) goto _exit; if (code) goto _exit;
...@@ -556,14 +561,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S ...@@ -556,14 +561,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1)); code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err; if (code) goto _err;
code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema); code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err; if (code) goto _err;
// next // next
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter); pRow1 = tsdbTbDataIterGet(pIter);
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid);
if (code) goto _err; if (code) goto _err;
iRow++; iRow++;
...@@ -578,13 +583,13 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S ...@@ -578,13 +583,13 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S
// check // check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
} }
} }
while (pRow2) { while (pRow2) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid);
if (code) goto _err; if (code) goto _err;
iRow++; iRow++;
...@@ -596,14 +601,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S ...@@ -596,14 +601,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S
// check // check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
} }
} }
// check // check
if (pBlockDataW->nRow > 0) { if (pBlockDataW->nRow > 0) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
} }
...@@ -620,7 +625,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -620,7 +625,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClearData(pBlockData); tBlockDataClearData(pBlockData);
TSDBROW *pRow = tsdbTbDataIterGet(pIter); TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) { while (true) {
if (pRow == NULL) { if (pRow == NULL) {
...@@ -636,7 +640,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -636,7 +640,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
if (code) goto _err; if (code) goto _err;
// append // append
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
...@@ -650,7 +654,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -650,7 +654,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
_write_block: _write_block:
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -662,29 +666,6 @@ _err: ...@@ -662,29 +666,6 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SBlock block;
if (pBlock->last) {
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0);
if (code) goto _err;
} else {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) {
int32_t nRow = 0; int32_t nRow = 0;
...@@ -713,31 +694,29 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S ...@@ -713,31 +694,29 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClearData(pBlockData); tBlockDataClearData(pBlockData);
while (true) {
TSDBROW *pRow = tsdbTbDataIterGet(pIter); TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) break;
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow) { if (pRow) {
TSDBKEY rowKey = TSDBROW_KEY(pRow); TSDBKEY rowKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) { if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) {
pRow = NULL; pRow = NULL;
} }
} }
if (pRow == NULL) {
break;
} }
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); ASSERT(pBlockData->nRow > 0 && pBlock->nRow + pBlockData->nRow <= pCommitter->maxRow);
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err;
}
SBlock block = *pBlock; code = tsdbCommitDataBlock(pCommitter, pBlock);
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &block, NULL, NULL, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock);
if (code) goto _err; if (code) goto _err;
return code; return code;
...@@ -753,36 +732,51 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -753,36 +732,51 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
if (pCommitter->dReader.pRowInfo) { if (pCommitter->dReader.pRowInfo) {
if (pCommitter->dReader.pRowInfo->suid) {
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break;
nRow++; nRow++;
} }
} else {
ASSERT(pCommitter->dReader.iRow == 0);
nRow += pCommitter->dReader.bDatal.nRow;
}
} }
if (nRow == 0) goto _exit; if (nRow == 0) goto _exit;
SBlockData *pBlockData;
TSDBROW *pRow = tsdbTbDataIterGet(pIter); TSDBROW *pRow = tsdbTbDataIterGet(pIter);
SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL; pRow = NULL;
} }
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && pRowInfo->uid != pTbData->uid) {
pRowInfo = NULL; pRowInfo = NULL;
} }
while (nRow) { while (nRow) {
SBlockData *pBlockData;
int8_t toData;
if (nRow < pCommitter->minRow) { // to .last if (nRow < pCommitter->minRow) { // to .last
toData = 0;
pBlockData = &pCommitter->dWriter.bDatal; pBlockData = &pCommitter->dWriter.bDatal;
// check if same schema // commit and reset block data schema if need
if (pBlockData->nRow > 0) {
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); tBlockDataReset(pBlockData);
}
}
// set block data schema if need
if (pBlockData->suid == 0 && pBlockData->uid == 0) {
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema, pTbData->suid,
pTbData->suid ? 0 : pTbData->uid);
if (code) goto _err; if (code) goto _err;
} }
...@@ -791,7 +785,9 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -791,7 +785,9 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
if (code) goto _err; if (code) goto _err;
} }
} else { // to .data } else { // to .data
toData = 1;
pBlockData = &pCommitter->dWriter.bData; pBlockData = &pCommitter->dWriter.bData;
ASSERT(pBlockData->nRow == 0);
} }
while (pRow && pRowInfo) { while (pRow && pRowInfo) {
...@@ -800,7 +796,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -800,7 +796,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
...@@ -809,14 +805,14 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -809,14 +805,14 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
pRow = NULL; pRow = NULL;
} }
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid);
if (code) goto _err; if (code) goto _err;
code = tsdbCommitterNextLastRow(pCommitter); code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err; if (code) goto _err;
pRowInfo = pCommitter->dReader.pRowInfo; pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { if (pRowInfo && pRowInfo->uid != pTbData->uid) {
pRowInfo = NULL; pRowInfo = NULL;
} }
} else { } else {
...@@ -824,16 +820,12 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -824,16 +820,12 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
} }
nRow--; nRow--;
if (toData) {
if (pBlockData->uid) { // .data block if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { code = tsdbCommitDataBlock(pCommitter, NULL);
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
goto _outer_break; goto _outer_break;
} }
} else {
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
} }
} }
...@@ -841,7 +833,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -841,7 +833,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
...@@ -851,40 +843,34 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -851,40 +843,34 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
} }
nRow--; nRow--;
if (pBlockData->uid) { // .data block if (toData) {
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
goto _outer_break; goto _outer_break;
} }
} else {
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
} }
} }
while (pRowInfo) { while (pRowInfo) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid);
if (code) goto _err; if (code) goto _err;
code = tsdbCommitterNextLastRow(pCommitter); code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err; if (code) goto _err;
pRowInfo = pCommitter->dReader.pRowInfo; pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { if (pRowInfo && pRowInfo->uid != pTbData->uid) {
pRowInfo = NULL; pRowInfo = NULL;
} }
nRow--; nRow--;
if (pBlockData->uid) { // .data block if (toData) {
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err; if (code) goto _err;
goto _outer_break; goto _outer_break;
} }
} else {
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
} }
} }
...@@ -922,8 +908,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -922,8 +908,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t iBlock = 0; int32_t iBlock = 0;
SBlock block; SBlock block;
SBlock *pBlock = &block; SBlock *pBlock = &block;
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0 && if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else { } else {
pBlock = NULL; pBlock = NULL;
...@@ -933,8 +918,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -933,8 +918,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
if (code) goto _err; if (code) goto _err;
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema); code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema, pTbData->suid, pTbData->uid);
if (code) goto _err; if (code) goto _err;
// .data merge
while (pBlock && pRow) { while (pBlock && pRow) {
int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)});
if (c < 0) { // disk if (c < 0) { // disk
...@@ -997,25 +984,9 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -997,25 +984,9 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
} }
} }
// merge with last // .data append and .last merge
code = tsdbMergeCommitLast(pCommitter, pIter); code = tsdbMergeCommitLast(pCommitter, pIter);
if (code) goto _err; if (code) goto _err;
#if 0
int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
VERSION_MIN}); if (pCommitter->dReader.pRowInfo) {
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
int64_t uid = pCommitter->dReader.bDatal.aUid[iRow];
if (uid == pTbData->uid) {
nRowLeft++;
}
}
}
while (nRowLeft) {
code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft);
if (code) goto _err;
}
#endif
// end // end
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
...@@ -1081,7 +1052,7 @@ _err: ...@@ -1081,7 +1052,7 @@ _err:
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
int32_t code = 0; int32_t code = 0;
// data // .data
while (true) { while (true) {
if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break; if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break;
...@@ -1094,61 +1065,64 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -1094,61 +1065,64 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
goto _err; goto _err;
} }
pCommitter->dReader.iBlockIdx++; code = tsdbCommitterNextTableData(pCommitter);
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code =
tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err; if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
} }
// last // .last
while (true) { while (true) {
if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break; if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break;
// commit if not same schema SBlockData *pBlockDataR = &pCommitter->dReader.bDatal;
if (pCommitter->dWriter.bDatal.nRow > 0) { SBlockData *pBlockDataW = &pCommitter->dWriter.bDatal;
if (pCommitter->dWriter.bDatal.suid != pCommitter->dReader.pRowInfo->suid || tb_uid_t suid = pCommitter->dReader.pRowInfo->suid;
pCommitter->dWriter.bDatal.suid == 0) { tb_uid_t uid = pCommitter->dReader.pRowInfo->uid;
ASSERT((pBlockDataR->suid && !pBlockDataR->uid) || (!pBlockDataR->suid && pBlockDataR->uid));
ASSERT(pBlockDataR->nRow > 0);
// commit and reset block data schema if need
if (pBlockDataW->nRow > 0) {
if (pBlockDataW->suid != pCommitter->dReader.pRowInfo->suid || pBlockDataW->suid == 0) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
tBlockDataReset(pBlockDataW);
} }
} }
if (pCommitter->dWriter.bDatal.nRow == 0) { // set block data schema if need
code = tsdbCommitterUpdateTableSchema(pCommitter, pCommitter->dReader.pRowInfo->suid, if (pBlockDataW->suid == 0 && pBlockDataW->uid == 0) {
pCommitter->dReader.pRowInfo->suid, 1 /*TODO*/); code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TOOD*/);
if (code) goto _err; if (code) goto _err;
pCommitter->dWriter.bDatal.suid = pCommitter->dReader.pRowInfo->suid; code = tBlockDataSetSchema(pBlockDataW, pCommitter->skmTable.pTSchema, suid, suid ? 0 : uid);
code = tBlockDataSetSchema(&pCommitter->dWriter.bDatal, pCommitter->skmTable.pTSchema);
if (code) goto _err; if (code) goto _err;
} }
// check if it can make sure that one table data in one block // check if it can make sure that one table data in one block
int64_t uid = pCommitter->dReader.pRowInfo->uid;
int32_t nRow = 0; int32_t nRow = 0;
for (int32_t iRow = pCommitter->dReader.iRow; if (pBlockDataR->suid) {
(iRow < pCommitter->dReader.bDatal.nRow) && (pCommitter->dReader.bDatal.aUid[iRow] == uid); iRow++) { for (int32_t iRow = pCommitter->dReader.iRow; (iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid);
iRow++) {
nRow++; nRow++;
} }
} else {
ASSERT(pCommitter->dReader.iRow == 0);
nRow = pBlockDataR->nRow;
}
ASSERT(nRow > 0 && nRow < pCommitter->minRow); ASSERT(nRow > 0 && nRow < pCommitter->minRow);
if (pCommitter->dWriter.bDatal.nRow + nRow > pCommitter->maxRow) { if (pBlockDataW->nRow + nRow > pCommitter->maxRow) {
ASSERT(pCommitter->dWriter.bDatal.nRow > 0); ASSERT(pBlockDataW->nRow > 0);
code = tsdbCommitLastBlock(pCommitter); code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
while (nRow > 0) { while (nRow > 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL); code = tBlockDataAppendRow(pBlockDataW, &pCommitter->dReader.pRowInfo->row, NULL, uid);
if (code) goto _err; if (code) goto _err;
code = tsdbCommitterNextLastRow(pCommitter); code = tsdbCommitterNextLastRow(pCommitter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册