diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index af256c0e19856957e73d946273cbb56f17a41b5a..4c31b3e07d488fc4441092461fbd45e2ab4bc86b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -469,7 +469,8 @@ struct SColData { }; struct SBlockData { - int64_t suid; + int64_t suid; // 0 means normal table data block + int64_t uid; // 0 means block data in .last file, others in .data file int32_t nRow; int64_t *aUid; int64_t *aVersion; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 70892acb0aa796fc741f35cad93d3f22f20ca7e1..63aa5f3ec81f982958ff420cfb609622fecd5de0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -290,7 +290,7 @@ _err: return code; } -static int32_t tsdbCommitNextLastRow(SCommitter *pCommitter) { +static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { int32_t code = 0; ASSERT(pCommitter->dReader.pReader); @@ -363,7 +363,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->dReader.bDatal.nRow = 0; pCommitter->dReader.iRow = -1; pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo; - code = tsdbCommitNextLastRow(pCommitter); + code = tsdbCommitterNextLastRow(pCommitter); if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; @@ -527,128 +527,94 @@ _exit: return code; } -static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey, - int8_t toDataOnly) { +static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { int32_t code = 0; - SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; - SBlockData *pBlockDataMerge = &pCommitter->dReader.bData; - SBlockData *pBlockData = &pCommitter->dWriter.bData; - SBlock block; - SBlock *pBlock = █ - TSDBROW *pRow1; - TSDBROW row2; - TSDBROW *pRow2 = &row2; - - // read SBlockData - code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); - if (code) goto _err; + STbData *pTbData = pIter->pTbData; + SBlockData *pBlockDataR = &pCommitter->dReader.bData; + SBlockData *pBlockDataW = &pCommitter->dWriter.bData; - code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); + code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR, NULL, NULL); if (code) goto _err; - // loop to merge - pRow1 = tsdbTbDataIterGet(pIter); - *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); - ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); - ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); - code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); - if (code) goto _err; + tBlockDataClearData(pBlockDataW); + int32_t iRow = 0; + TSDBROW row; + TSDBROW *pRow1 = tsdbTbDataIterGet(pIter); + TSDBROW *pRow2 = &row; + *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); + while (pRow1 && pRow2) { + int32_t c = tsdbRowCmprFn(pRow1, pRow2); - tBlockReset(pBlock); - tBlockDataClearData(pBlockData); - while (true) { - if (pRow1 == NULL && pRow2 == NULL) { - if (pBlockData->nRow == 0) { - break; - } else { - goto _write_block; - } - } + if (c < 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1)); + if (code) goto _err; - if (pRow1 && pRow2) { - int32_t c = tsdbRowCmprFn(pRow1, pRow2); - if (c < 0) { - goto _append_mem_row; - } else if (c > 0) { - goto _append_block_row; - } else { - ASSERT(0); - } - } else if (pRow1) { - goto _append_mem_row; - } else { - goto _append_block_row; - } + code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema); + if (code) goto _err; - _append_mem_row: - code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema); - if (code) goto _err; + // next + tsdbTbDataIterNext(pIter); + pRow1 = tsdbTbDataIterGet(pIter); + } else if (c > 0) { + code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); + if (code) goto _err; - tsdbTbDataIterNext(pIter); - pRow1 = tsdbTbDataIterGet(pIter); - if (pRow1) { - if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); - if (code) goto _err; + iRow++; + if (iRow < pBlockDataR->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); } else { - pRow1 = NULL; + pRow2 = NULL; } + } else { + ASSERT(0); } - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - goto _write_block; - } else { - continue; + // check + if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; } + } - _append_block_row: - code = tBlockDataAppendRow(pBlockData, pRow2, NULL); + while (pRow2) { + code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); if (code) goto _err; - if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { - *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); + iRow++; + if (iRow < pBlockDataR->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); } else { pRow2 = NULL; } - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - goto _write_block; - } else { - continue; + // check + if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; } + } - _write_block: - code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly); + // check + if (pBlockDataW->nRow > 0) { + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; - - tBlockReset(pBlock); - tBlockDataClearData(pBlockData); } return code; _err: - tsdbError("vgId:%d, tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, tsdb merge commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { - int32_t code = 0; -#if 0 - TSDBROW *pRow; - SBlock block; - SBlock *pBlock = █ +static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) { + int32_t code = 0; + STbData *pTbData = pIter->pTbData; SBlockData *pBlockData = &pCommitter->dWriter.bData; - int64_t suid = pIter->pTbData->suid; - int64_t uid = pIter->pTbData->uid; - - code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); - if (code) goto _err; - tBlockReset(pBlock); tBlockDataClearData(pBlockData); - pRow = tsdbTbDataIterGet(pIter); - ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0); + + TSDBROW *pRow = tsdbTbDataIterGet(pIter); while (true) { if (pRow == NULL) { if (pBlockData->nRow > 0) { @@ -659,7 +625,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter } // update schema - code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow)); + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; // append @@ -668,27 +634,20 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); - // if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL; - // crash on CI, use the block following if (pRow) { - TSDBKEY tmpKey = TSDBROW_KEY(pRow); - if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) { + TSDBKEY rowKey = TSDBROW_KEY(pRow); + if (tsdbKeyCmprFn(&rowKey, &toKey) >= 0) { pRow = NULL; } } - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; - continue; - - _write_block: - code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly); - if (code) goto _err; - - tBlockReset(pBlock); - tBlockDataClearData(pBlockData); + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + _write_block: + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; + } } -#endif return code; _err: @@ -719,26 +678,6 @@ _err: return code; } -// static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { -// int32_t code = 0; -// SBlockIdx blockIdx = {.suid = suid, .uid = uid}; -// SBlockIdx *pBlockIdx = &blockIdx; - -// code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx); -// if (code) goto _err; - -// if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _err; -// } - -// return code; - -// _err: -// tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { int32_t nRow = 0; @@ -763,42 +702,35 @@ static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { int32_t code = 0; + STbData *pTbData = pIter->pTbData; SBlockData *pBlockData = &pCommitter->dWriter.bData; - SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; - SBlock block; - TSDBROW *pRow; - - code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); - if (code) goto _err; - pRow = tsdbTbDataIterGet(pIter); - code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; + tBlockDataClearData(pBlockData); while (true) { - if (pRow == NULL) break; - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); - if (code) goto _err; - - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); + TSDBROW *pRow = tsdbTbDataIterGet(pIter); if (pRow) { - TSDBKEY key = TSDBROW_KEY(pRow); - int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock); - - if (c == 0) { - code = - tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - } else if (c > 0) { + TSDBKEY rowKey = TSDBROW_KEY(pRow); + if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) { pRow = NULL; - } else { - ASSERT(0); } } + + if (pRow == NULL) { + break; + } + + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); + if (code) goto _err; } - block = *pBlock; - code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0); + SBlock block = *pBlock; + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &block, NULL, NULL, pCommitter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); if (code) goto _err; return code; @@ -808,16 +740,101 @@ _err: return code; } -static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, int32_t *nRow) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo; +static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { + int32_t code = 0; + STbData *pTbData = pIter->pTbData; + int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); - while (pRow && pRowInfo) { - int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); - if (c < 0) { - code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL); + if (pCommitter->dReader.pRowInfo) { + for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { + if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; + nRow++; + } + } + + if (nRow == 0) goto _exit; + + SBlockData *pBlockData; + TSDBROW *pRow = tsdbTbDataIterGet(pIter); + SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo; + + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + + if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + pRowInfo = NULL; + } + + while (nRow) { + if (nRow < pCommitter->minRow) { // to .last + pBlockData = &pCommitter->dWriter.bDatal; + + // check if same schema + if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + + code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); + if (code) goto _err; + } + + if (pBlockData->nRow + nRow > pCommitter->maxRow) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + } else { // to .data + pBlockData = &pCommitter->dWriter.bData; + } + + while (pRow && pRowInfo) { + int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); + if (c < 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); + if (code) goto _err; + + code = tsdbCommitterNextLastRow(pCommitter); + if (code) goto _err; + + pRowInfo = pCommitter->dReader.pRowInfo; + if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + pRowInfo = NULL; + } + } else { + ASSERT(0); + } + + nRow--; + + if (pBlockData->uid) { // .data block + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; + + goto _outer_break; + } + } else { + ASSERT(pBlockData->nRow <= pCommitter->maxRow); + } + } + + while (pRow) { + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -825,81 +842,54 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, i if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { pRow = NULL; } - } else if (c > 0) { - code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL); - if (code) goto _err; - pCommitter->dReader.iRow++; - if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { - // todo - } else { - pCommitter->dReader.iBlockL++; - if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - // todo - } else { - // todo + nRow--; + if (pBlockData->uid) { // .data block + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; + + goto _outer_break; } + } else { + ASSERT(pBlockData->nRow <= pCommitter->maxRow); } + } + + while (pRowInfo) { + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); + if (code) goto _err; + + code = tsdbCommitterNextLastRow(pCommitter); + if (code) goto _err; pRowInfo = pCommitter->dReader.pRowInfo; if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { pRowInfo = NULL; } - } else { - ASSERT(0); - } - - (*nRow)--; - ASSERT(*nRow >= 0); - if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; - } - while (pRow) { - code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL); - if (code) goto _err; - - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - - (*nRow)--; - ASSERT(*nRow >= 0); - if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; - } - - while (pRowInfo) { - code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL); - if (code) goto _err; + nRow--; + if (pBlockData->uid) { // .data block + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; - pCommitter->dReader.iRow++; - if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { - // todo - } else { - pCommitter->dReader.iBlockL++; - if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - // todo + goto _outer_break; + } } else { - // todo + ASSERT(pBlockData->nRow <= pCommitter->maxRow); } } - pRowInfo = pCommitter->dReader.pRowInfo; - if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { - pRowInfo = NULL; - } - - (*nRow)--; - ASSERT(*nRow >= 0); - if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; + _outer_break: + ASSERT(nRow >= 0); } - SBlock block; -_write_block_data: +_exit: return code; _err: + tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -909,14 +899,6 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0); - // end last if need - if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) { - if (pCommitter->dWriter.bDatal.nRow > 0) { - // TODO: code = tsdbCommitBlockDataL(pCommitter); - if (code) goto _err; - } - } - // merge commit table data STbDataIter iter = {0}; STbDataIter *pIter = &iter; @@ -930,22 +912,22 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { if (pRow == NULL) goto _exit; - SBlockIdx *pBlockIdx = NULL; - int32_t iBlock = 0; - SBlock block; - SBlock *pBlock = █ - - if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { - pBlockIdx = pCommitter->dReader.pBlockIdx; - } - - if (pBlockIdx && iBlock < pCommitter->dReader.mBlock.nItem) { + int32_t iBlock = 0; + SBlock block; + SBlock *pBlock = █ + if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0 && + iBlock < pCommitter->dReader.mBlock.nItem) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } + code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer); + if (code) goto _err; + tMapDataReset(&pCommitter->dWriter.mBlock); + code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema); + if (code) goto _err; while (pBlock && pRow) { int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); if (c < 0) { // disk @@ -959,8 +941,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { } else { pBlock = NULL; } - } else if (c < 0) { // memory - code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); + } else if (c > 0) { // memory + code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey); if (code) goto _err; // next @@ -977,7 +959,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); if (code) goto _err; } else { - // code = tsdbMergeTableData(pCommitter, pIter, pBlock, NULL, 1); + code = tsdbMergeCommitData(pCommitter, pIter, pBlock); if (code) goto _err; } @@ -999,6 +981,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); if (code) goto _err; + // next iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); @@ -1008,8 +991,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { } // merge with last - int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); - if (pCommitter->dReader.pRowInfo) { + code = tsdbMergeCommitLast(pCommitter, pIter); + if (code) goto _err; +#if 0 + int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = + VERSION_MIN}); if (pCommitter->dReader.pRowInfo) { for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { int64_t uid = pCommitter->dReader.bDatal.aUid[iRow]; if (uid == pTbData->uid) { @@ -1022,6 +1008,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft); if (code) goto _err; } +#endif // end if (pCommitter->dWriter.mBlock.nItem > 0) { @@ -1051,14 +1038,14 @@ _err: static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; - // write aBlockL - code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL); - if (code) goto _err; - // write aBlockIdx code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL); if (code) goto _err; + // write aBlockL + code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL); + if (code) goto _err; + // update file header code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); if (code) goto _err; @@ -1157,7 +1144,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL); if (code) goto _err; - code = tsdbCommitNextLastRow(pCommitter); + code = tsdbCommitterNextLastRow(pCommitter); if (code) goto _err; nRow--; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 79929c44a0e6ea2f70651349b48d650e831bdc34..9634b14bf7773a8badf9f9077eadd0ab00df2f24 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -537,7 +537,7 @@ _err: int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) { int32_t code = 0; int64_t offset = pReader->pSet->pHeadF->offset; - int64_t size = pReader->pSet->pHeadF->size - offset; + int64_t size = pReader->pSet->pHeadF->loffset - offset; uint8_t *pBuf = NULL; int64_t n; uint32_t delimiter; @@ -604,7 +604,7 @@ _err: int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) { int32_t code = 0; int64_t offset = pReader->pSet->pHeadF->loffset; - int64_t size = pReader->pSet->pHeadF->offset - offset; + int64_t size = pReader->pSet->pHeadF->size - offset; int64_t n; uint32_t delimiter; uint8_t *pBuf = NULL;